Updated algo & working on math operators

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8f00cefa
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8f00cefa
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8f00cefa

Branch: refs/heads/master
Commit: 8f00cefa2a14756a65e0baad48db0d52e2fe66a3
Parents: 2e47b4c
Author: Lakshmi Prasanna Velineni <[email protected]>
Authored: Wed Aug 24 20:54:53 2016 -0700
Committer: Lakshmi Prasanna Velineni <[email protected]>
Committed: Thu Sep 1 09:12:43 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/apps/logstream/Application.java |   5 +-
 .../contrib/sqlite/SqliteStreamOperator.java    |   4 +-
 .../misc/algo/AbstractStreamPatternMatcher.java | 174 +++++++++++
 .../contrib/misc/algo/AllAfterMatchMap.java     | 120 ++++++++
 .../malhar/contrib/misc/algo/DistinctMap.java   | 113 +++++++
 .../malhar/contrib/misc/algo/FilterKeyVals.java | 163 ++++++++++
 .../contrib/misc/algo/FilterKeysHashMap.java    | 184 +++++++++++
 .../malhar/contrib/misc/algo/FilterKeysMap.java | 196 ++++++++++++
 .../malhar/contrib/misc/algo/FirstMatchMap.java | 117 +++++++
 .../apex/malhar/contrib/misc/algo/FirstN.java   | 113 +++++++
 .../contrib/misc/algo/FirstTillMatch.java       | 116 +++++++
 .../contrib/misc/algo/InsertSortDesc.java       | 136 +++++++++
 .../malhar/contrib/misc/algo/InvertIndex.java   | 146 +++++++++
 .../contrib/misc/algo/InvertIndexArray.java     | 130 ++++++++
 .../malhar/contrib/misc/algo/LastMatchMap.java  | 112 +++++++
 .../contrib/misc/algo/LeastFrequentKeyMap.java  | 149 +++++++++
 .../misc/algo/LeastFrequentKeyValueMap.java     | 107 +++++++
 .../contrib/misc/algo/MostFrequentKeyMap.java   | 142 +++++++++
 .../misc/algo/MostFrequentKeyValueMap.java      | 110 +++++++
 .../apex/malhar/contrib/misc/algo/Sampler.java  | 121 ++++++++
 .../apex/malhar/contrib/misc/math/Change.java   | 119 ++++++++
 .../malhar/contrib/misc/math/ChangeAlert.java   | 120 ++++++++
 .../contrib/misc/math/ChangeAlertKeyVal.java    | 131 ++++++++
 .../contrib/misc/math/ChangeAlertMap.java       | 125 ++++++++
 .../malhar/contrib/misc/math/ChangeKeyVal.java  | 125 ++++++++
 .../contrib/misc/math/CompareExceptMap.java     | 131 ++++++++
 .../malhar/contrib/misc/math/CompareMap.java    |  88 ++++++
 .../malhar/contrib/misc/math/CountKeyVal.java   | 116 +++++++
 .../malhar/contrib/misc/math/ExceptMap.java     | 104 +++++++
 .../apex/malhar/contrib/misc/math/Quotient.java | 111 +++++++
 .../malhar/contrib/misc/math/QuotientMap.java   | 239 +++++++++++++++
 .../malhar/contrib/misc/math/SumCountMap.java   | 305 +++++++++++++++++++
 .../streamquery/AbstractSqlStreamOperator.java  | 192 ++++++++++++
 .../misc/streamquery/DeleteOperator.java        |  88 ++++++
 .../streamquery/DerbySqlStreamOperator.java     | 200 ++++++++++++
 .../misc/streamquery/GroupByHavingOperator.java | 230 ++++++++++++++
 .../misc/streamquery/InnerJoinOperator.java     | 212 +++++++++++++
 .../misc/streamquery/OrderByOperator.java       | 181 +++++++++++
 .../contrib/misc/streamquery/OrderByRule.java   |  99 ++++++
 .../misc/streamquery/OuterJoinOperator.java     | 123 ++++++++
 .../streamquery/SelectFunctionOperator.java     | 129 ++++++++
 .../misc/streamquery/SelectOperator.java        | 113 +++++++
 .../misc/streamquery/SelectTopOperator.java     | 131 ++++++++
 .../misc/streamquery/UpdateOperator.java        | 111 +++++++
 .../streamquery/condition/BetweenCondition.java | 107 +++++++
 .../condition/CompoundCondition.java            | 132 ++++++++
 .../condition/EqualValueCondition.java          |  99 ++++++
 .../condition/HavingCompareValue.java           |  79 +++++
 .../streamquery/condition/HavingCondition.java  |  58 ++++
 .../misc/streamquery/condition/InCondition.java |  94 ++++++
 .../streamquery/condition/LikeCondition.java    | 105 +++++++
 .../streamquery/function/AverageFunction.java   |  82 +++++
 .../streamquery/function/CountFunction.java     |  87 ++++++
 .../streamquery/function/FirstLastFunction.java | 113 +++++++
 .../streamquery/function/FunctionIndex.java     |  95 ++++++
 .../streamquery/function/MaxMinFunction.java    | 105 +++++++
 .../misc/streamquery/function/SumFunction.java  |  64 ++++
 .../streamquery/index/BinaryExpression.java     |  75 +++++
 .../misc/streamquery/index/MidIndex.java        |  82 +++++
 .../streamquery/index/NegateExpression.java     |  61 ++++
 .../streamquery/index/RoundDoubleIndex.java     |  64 ++++
 .../misc/streamquery/index/StringCaseIndex.java |  66 ++++
 .../misc/streamquery/index/StringLenIndex.java  |  60 ++++
 .../misc/streamquery/index/SumExpression.java   |  65 ++++
 .../misc/streamquery/index/UnaryExpression.java |  78 +++++
 .../contrib/misc/streamquery/package-info.java  |  23 ++
 .../algo/AbstractStreamPatternMatcherTest.java  | 173 +++++++++++
 .../contrib/misc/algo/AllAfterMatchMapTest.java |  93 ++++++
 .../contrib/misc/algo/DistinctMapTest.java      | 111 +++++++
 .../contrib/misc/algo/FilterKeyValsTest.java    | 127 ++++++++
 .../misc/algo/FilterKeysHashMapTest.java        | 151 +++++++++
 .../contrib/misc/algo/FilterKeysMapTest.java    | 121 ++++++++
 .../contrib/misc/algo/FirstMatchMapTest.java    | 103 +++++++
 .../malhar/contrib/misc/algo/FirstNTest.java    | 123 ++++++++
 .../contrib/misc/algo/FirstTillMatchTest.java   | 110 +++++++
 .../contrib/misc/algo/InsertSortDescTest.java   |  97 ++++++
 .../contrib/misc/algo/InvertIndexArrayTest.java | 101 ++++++
 .../contrib/misc/algo/InvertIndexTest.java      | 104 +++++++
 .../contrib/misc/algo/LastMatchMapTest.java     | 105 +++++++
 .../misc/algo/LeastFrequentKeyMapTest.java      | 117 +++++++
 .../misc/algo/LeastFrequentKeyValueMapTest.java | 109 +++++++
 .../malhar/contrib/misc/algo/MatchMapTest.java  |  83 +++++
 .../misc/algo/MostFrequentKeyMapTest.java       | 118 +++++++
 .../misc/algo/MostFrequentKeyValueMapTest.java  | 109 +++++++
 .../malhar/contrib/misc/algo/SamplerTest.java   |  64 ++++
 .../misc/math/ChangeAlertKeyValTest.java        | 108 +++++++
 .../contrib/misc/math/ChangeAlertMapTest.java   | 116 +++++++
 .../contrib/misc/math/ChangeAlertTest.java      |  83 +++++
 .../contrib/misc/math/ChangeKeyValTest.java     | 112 +++++++
 .../malhar/contrib/misc/math/ChangeTest.java    |  85 ++++++
 .../contrib/misc/math/CompareExceptMapTest.java |  98 ++++++
 .../contrib/misc/math/CompareMapTest.java       |  83 +++++
 .../contrib/misc/math/CountKeyValTest.java      |  83 +++++
 .../malhar/contrib/misc/math/ExceptMapTest.java |  85 ++++++
 .../contrib/misc/math/QuotientMapTest.java      |  94 ++++++
 .../malhar/contrib/misc/math/QuotientTest.java  | 104 +++++++
 .../contrib/misc/math/SumCountMapTest.java      | 156 ++++++++++
 .../misc/streamquery/DeleteOperatorTest.java    |  80 +++++
 .../streamquery/FullOuterJoinOperatorTest.java  |  93 ++++++
 .../misc/streamquery/GroupByOperatorTest.java   |  97 ++++++
 .../misc/streamquery/HavingOperatorTest.java    |  99 ++++++
 .../misc/streamquery/InnerJoinOperatorTest.java |  92 ++++++
 .../streamquery/LeftOuterJoinOperatorTest.java  |  93 ++++++
 .../misc/streamquery/OrderByOperatorTest.java   |  95 ++++++
 .../streamquery/RightOuterJoinOperatorTest.java |  95 ++++++
 .../misc/streamquery/SelectOperatorTest.java    |  84 +++++
 .../misc/streamquery/SelectTopOperatorTest.java |  66 ++++
 .../misc/streamquery/UpdateOperatorTest.java    |  78 +++++
 .../advanced/BetweenConditionTest.java          |  90 ++++++
 .../advanced/CompoundConditionTest.java         |  95 ++++++
 .../streamquery/advanced/InConditionTest.java   |  93 ++++++
 .../streamquery/advanced/LikeConditionTest.java |  84 +++++
 .../streamquery/advanced/NegateIndexTest.java   |  78 +++++
 .../streamquery/advanced/SelectAverageTest.java |  78 +++++
 .../streamquery/advanced/SelectCountTest.java   |  79 +++++
 .../advanced/SelectFirstLastTest.java           |  79 +++++
 .../streamquery/advanced/SelectMaxMinTest.java  |  79 +++++
 .../misc/streamquery/advanced/SumIndexTest.java |  79 +++++
 demos/yahoofinance/pom.xml                      |  11 +
 .../yahoofinance/ApplicationWithDerbySQL.java   |   4 +-
 .../lib/algo/AbstractStreamPatternMatcher.java  | 173 -----------
 .../datatorrent/lib/algo/AllAfterMatchMap.java  | 118 -------
 .../lib/algo/CompareExceptCountMap.java         |   3 +-
 .../java/com/datatorrent/lib/algo/Distinct.java |   5 +-
 .../com/datatorrent/lib/algo/DistinctMap.java   | 111 -------
 .../com/datatorrent/lib/algo/FilterKeyVals.java | 161 ----------
 .../datatorrent/lib/algo/FilterKeysHashMap.java | 182 -----------
 .../com/datatorrent/lib/algo/FilterKeysMap.java | 194 ------------
 .../com/datatorrent/lib/algo/FilterValues.java  |  17 +-
 .../com/datatorrent/lib/algo/FirstMatchMap.java | 116 -------
 .../java/com/datatorrent/lib/algo/FirstN.java   | 112 -------
 .../datatorrent/lib/algo/FirstTillMatch.java    | 115 -------
 .../datatorrent/lib/algo/InsertSortDesc.java    | 135 --------
 .../com/datatorrent/lib/algo/InvertIndex.java   | 145 ---------
 .../datatorrent/lib/algo/InvertIndexArray.java  | 129 --------
 .../com/datatorrent/lib/algo/LastMatchMap.java  | 111 -------
 .../lib/algo/LeastFrequentKeyMap.java           | 149 ---------
 .../lib/algo/LeastFrequentKeyValueMap.java      | 106 -------
 .../com/datatorrent/lib/algo/MatchAllMap.java   |   3 +-
 .../com/datatorrent/lib/algo/MatchAnyMap.java   |   3 +-
 .../java/com/datatorrent/lib/algo/MatchMap.java |   2 +
 .../com/datatorrent/lib/algo/MergeSort.java     |   2 +
 .../datatorrent/lib/algo/MergeSortNumber.java   |   2 +
 .../lib/algo/MostFrequentKeyMap.java            | 141 ---------
 .../lib/algo/MostFrequentKeyValueMap.java       | 110 -------
 .../java/com/datatorrent/lib/algo/Sampler.java  | 119 --------
 .../com/datatorrent/lib/algo/TopNUnique.java    |   3 +-
 .../datatorrent/lib/join/AntiJoinOperator.java  |   2 +-
 .../java/com/datatorrent/lib/math/Average.java  |  29 +-
 .../java/com/datatorrent/lib/math/Change.java   | 117 -------
 .../com/datatorrent/lib/math/ChangeAlert.java   | 118 -------
 .../datatorrent/lib/math/ChangeAlertKeyVal.java | 129 --------
 .../datatorrent/lib/math/ChangeAlertMap.java    | 123 --------
 .../com/datatorrent/lib/math/ChangeKeyVal.java  | 123 --------
 .../datatorrent/lib/math/CompareExceptMap.java  | 129 --------
 .../com/datatorrent/lib/math/CompareMap.java    |  86 ------
 .../com/datatorrent/lib/math/CountKeyVal.java   | 114 -------
 .../com/datatorrent/lib/math/ExceptMap.java     | 102 -------
 .../java/com/datatorrent/lib/math/Quotient.java | 109 -------
 .../com/datatorrent/lib/math/QuotientMap.java   | 237 --------------
 .../com/datatorrent/lib/math/SumCountMap.java   | 303 ------------------
 .../streamquery/AbstractSqlStreamOperator.java  | 190 ------------
 .../lib/streamquery/DeleteOperator.java         |  86 ------
 .../lib/streamquery/DerbySqlStreamOperator.java | 197 ------------
 .../lib/streamquery/GroupByHavingOperator.java  | 260 ----------------
 .../lib/streamquery/InnerJoinOperator.java      | 210 -------------
 .../lib/streamquery/OrderByOperator.java        | 179 -----------
 .../lib/streamquery/OrderByRule.java            |  97 ------
 .../lib/streamquery/OuterJoinOperator.java      | 121 --------
 .../lib/streamquery/SelectFunctionOperator.java | 126 --------
 .../lib/streamquery/SelectOperator.java         | 111 -------
 .../lib/streamquery/SelectTopOperator.java      | 129 --------
 .../lib/streamquery/UpdateOperator.java         | 109 -------
 .../streamquery/condition/BetweenCondition.java | 103 -------
 .../condition/CompoundCondition.java            | 128 --------
 .../condition/EqualValueCondition.java          |  96 ------
 .../condition/HavingCompareValue.java           |  77 -----
 .../streamquery/condition/HavingCondition.java  |  56 ----
 .../lib/streamquery/condition/InCondition.java  |  90 ------
 .../condition/JoinColumnEqualCondition.java     |   1 -
 .../streamquery/condition/LikeCondition.java    | 102 -------
 .../streamquery/function/AverageFunction.java   |  80 -----
 .../lib/streamquery/function/CountFunction.java |  85 ------
 .../streamquery/function/FirstLastFunction.java | 111 -------
 .../lib/streamquery/function/FunctionIndex.java |  93 ------
 .../streamquery/function/MaxMinFunction.java    | 103 -------
 .../lib/streamquery/function/SumFunction.java   |  62 ----
 .../lib/streamquery/index/BinaryExpression.java |  72 -----
 .../lib/streamquery/index/MidIndex.java         |  78 -----
 .../lib/streamquery/index/NegateExpression.java |  59 ----
 .../lib/streamquery/index/RoundDoubleIndex.java |  60 ----
 .../lib/streamquery/index/StringCaseIndex.java  |  62 ----
 .../lib/streamquery/index/StringLenIndex.java   |  56 ----
 .../lib/streamquery/index/SumExpression.java    |  63 ----
 .../lib/streamquery/index/UnaryExpression.java  |  75 -----
 .../lib/streamquery/package-info.java           |  23 --
 .../algo/AbstractStreamPatternMatcherTest.java  | 173 -----------
 .../lib/algo/AllAfterMatchMapTest.java          |  92 ------
 .../lib/algo/CompareExceptCountMapTest.java     |   6 +-
 .../datatorrent/lib/algo/DistinctMapTest.java   | 110 -------
 .../datatorrent/lib/algo/FilterKeyValsTest.java | 126 --------
 .../lib/algo/FilterKeysHashMapTest.java         | 150 ---------
 .../datatorrent/lib/algo/FilterKeysMapTest.java | 120 --------
 .../datatorrent/lib/algo/FirstMatchMapTest.java | 102 -------
 .../com/datatorrent/lib/algo/FirstNTest.java    | 122 --------
 .../lib/algo/FirstTillMatchTest.java            | 109 -------
 .../lib/algo/InsertSortDescTest.java            |  96 ------
 .../lib/algo/InvertIndexArrayTest.java          | 100 ------
 .../datatorrent/lib/algo/InvertIndexTest.java   | 103 -------
 .../datatorrent/lib/algo/LastMatchMapTest.java  | 104 -------
 .../lib/algo/LeastFrequentKeyMapTest.java       | 116 -------
 .../lib/algo/LeastFrequentKeyValueMapTest.java  | 108 -------
 .../datatorrent/lib/algo/MatchAllMapTest.java   |   5 +-
 .../datatorrent/lib/algo/MatchAnyMapTest.java   |   6 +-
 .../com/datatorrent/lib/algo/MatchMapTest.java  |  81 -----
 .../lib/algo/MergeSortNumberTest.java           |   4 +-
 .../lib/algo/MostFrequentKeyMapTest.java        | 117 -------
 .../lib/algo/MostFrequentKeyValueMapTest.java   | 108 -------
 .../com/datatorrent/lib/algo/SamplerTest.java   |  63 ----
 .../datatorrent/lib/algo/TopNUniqueTest.java    |   5 +-
 .../lib/join/AntiJoinOperatorTest.java          |   1 +
 .../com/datatorrent/lib/math/AverageTest.java   |   4 +-
 .../lib/math/ChangeAlertKeyValTest.java         | 107 -------
 .../lib/math/ChangeAlertMapTest.java            | 115 -------
 .../datatorrent/lib/math/ChangeAlertTest.java   |  82 -----
 .../datatorrent/lib/math/ChangeKeyValTest.java  | 111 -------
 .../com/datatorrent/lib/math/ChangeTest.java    |  84 -----
 .../lib/math/CompareExceptMapTest.java          |  97 ------
 .../datatorrent/lib/math/CompareMapTest.java    |  82 -----
 .../datatorrent/lib/math/CountKeyValTest.java   |  82 -----
 .../com/datatorrent/lib/math/DivisionTest.java  |   1 +
 .../com/datatorrent/lib/math/ExceptMapTest.java |  83 -----
 .../lib/math/MultiplyByConstantTest.java        |   2 +
 .../datatorrent/lib/math/QuotientMapTest.java   |  92 ------
 .../com/datatorrent/lib/math/QuotientTest.java  | 102 -------
 .../com/datatorrent/lib/math/SigmaTest.java     |   3 +-
 .../datatorrent/lib/math/SumCountMapTest.java   | 154 ----------
 .../lib/streamquery/DeleteOperatorTest.java     |  77 -----
 .../streamquery/FullOuterJoinOperatorTest.java  |  93 ------
 .../lib/streamquery/GroupByOperatorTest.java    |  94 ------
 .../lib/streamquery/HavingOperatorTest.java     |  96 ------
 .../lib/streamquery/InnerJoinOperatorTest.java  |  91 ------
 .../streamquery/LeftOuterJoinOperatorTest.java  |  92 ------
 .../lib/streamquery/OrderByOperatorTest.java    |  93 ------
 .../streamquery/RightOuterJoinOperatorTest.java |  94 ------
 .../lib/streamquery/SelectOperatorTest.java     |  81 -----
 .../lib/streamquery/SelectTopOperatorTest.java  |  65 ----
 .../lib/streamquery/UpdateOperatorTest.java     |  76 -----
 .../advanced/BetweenConditionTest.java          |  87 ------
 .../advanced/CompoundConditionTest.java         |  92 ------
 .../streamquery/advanced/InConditionTest.java   |  90 ------
 .../streamquery/advanced/LikeConditionTest.java |  81 -----
 .../streamquery/advanced/NegateIndexTest.java   |  75 -----
 .../streamquery/advanced/SelectAverageTest.java |  75 -----
 .../streamquery/advanced/SelectCountTest.java   |  76 -----
 .../advanced/SelectFirstLastTest.java           |  76 -----
 .../streamquery/advanced/SelectMaxMinTest.java  |  76 -----
 .../lib/streamquery/advanced/SumIndexTest.java  |  76 -----
 samples/pom.xml                                 |   4 +-
 .../lib/algo/AllAfterMatchMapSample.java        |   4 +-
 .../samples/lib/math/ChangeSample.java          |   4 +-
 .../samples/lib/math/CompreMapSample.java       |   4 +-
 .../samples/lib/math/CountKeyValSample.java     |   4 +-
 263 files changed, 12997 insertions(+), 12767 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
----------------------------------------------------------------------
diff --git 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
index 0cd3f79..98dfebd 100644
--- 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
+++ 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
@@ -25,6 +25,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,8 +39,6 @@ import 
com.datatorrent.lib.logs.MultiWindowDimensionAggregation;
 import 
com.datatorrent.lib.logs.MultiWindowDimensionAggregation.AggregateOperation;
 import com.datatorrent.lib.stream.Counter;
 import com.datatorrent.lib.stream.JsonByteArrayOperator;
-import com.datatorrent.lib.streamquery.SelectOperator;
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
 import com.datatorrent.lib.streamquery.index.ColumnIndex;
 import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator;
 import com.datatorrent.lib.util.DimensionTimeBucketSumOperator;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java
index ee06272..2cb33e4 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java
@@ -21,8 +21,8 @@ package com.datatorrent.contrib.sqlite;
 import com.almworks.sqlite4java.SQLiteConnection;
 import com.almworks.sqlite4java.SQLiteException;
 import com.almworks.sqlite4java.SQLiteStatement;
-import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator;
-import 
com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
 import com.datatorrent.api.Context.OperatorContext;
 import java.io.File;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java
new file mode 100644
index 0000000..38b176c
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+@Deprecated
+/**
+ * <p>
+ * This operator searches for a given pattern in the input stream.<br>
+ * For e.g. If the pattern is defined as “aa” and your input events arrive 
in following manner “a”, “a”, “a”, then this operator
+ * will emit 2 matches for the given pattern. One matching event 1 and 2 and 
other matching 2 and 3.
+ * </p>
+ *
+ * <br>
+ * <b> StateFull : Yes, </b> Pattern is found over application window(s). <br>
+ * <b> Partitionable : No, </b> will yield wrong result. <br>
+ *
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>inputPort</b>: the port to receive input<br>
+ *
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>pattern</b>: The pattern that needs to be searched<br>
+ *
+ * @param <T> event type
+ *
+ * @since 2.0.0
+ * @deprecated
+ */
+
+@OperatorAnnotation(partitionable = false)
+public abstract class AbstractStreamPatternMatcher<T> extends BaseOperator
+{
+  /**
+   * The pattern to be searched in the input stream of events
+   */
+  @NotNull
+  private Pattern<T> pattern;
+
+  // this stores the index of the partial matches found so far
+  private List<MutableInt> partialMatches = Lists.newLinkedList();
+  private transient MutableInt patternLength;
+
+  /**
+   * Set the pattern that needs to be searched in the input stream of events
+   *
+   * @param pattern The pattern to be searched
+   */
+  public void setPattern(Pattern<T> pattern)
+  {
+    this.pattern = pattern;
+    partialMatches.clear();
+    patternLength = new MutableInt(pattern.getStates().length - 1);
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    patternLength = new MutableInt(pattern.getStates().length - 1);
+  }
+
+  /**
+   * Get the pattern that is searched in the input stream of events
+   *
+   * @return Returns the pattern searched
+   */
+  public Pattern<T> getPattern()
+  {
+    return pattern;
+  }
+
+  public transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T t)
+    {
+      if (pattern.checkState(t, 0)) {
+        partialMatches.add(new MutableInt(-1));
+      }
+      if (partialMatches.size() > 0) {
+        MutableInt tempInt;
+        Iterator<MutableInt> itr = partialMatches.iterator();
+        while (itr.hasNext()) {
+          tempInt = itr.next();
+          tempInt.increment();
+          if (!pattern.checkState(t, tempInt.intValue())) {
+            itr.remove();
+          } else if (tempInt.equals(patternLength)) {
+            itr.remove();
+            processPatternFound();
+          }
+        }
+      }
+    }
+  };
+
+  /**
+   * This function determines how to process the pattern found
+   */
+  public abstract void processPatternFound();
+
+  public static class Pattern<T>
+  {
+    /**
+     * The states of the pattern
+     */
+    @NotNull
+    private final T[] states;
+
+    //for kryo
+    private Pattern()
+    {
+      states = null;
+    }
+
+    public Pattern(@NotNull T[] states)
+    {
+      this.states = states;
+    }
+
+    /**
+     * Checks if the input state matches the state at index "index" of the 
pattern
+     *
+     * @param t     The input state
+     * @param index The index to match in the pattern
+     * @return True if the state exists at index "index" else false
+     */
+    public boolean checkState(T t, int index)
+    {
+      return states[index].equals(t);
+    }
+
+    /**
+     * Get the states of the pattern
+     *
+     * @return The states of the pattern
+     */
+    public T[] getStates()
+    {
+      return states;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java
new file mode 100644
index 0000000..44118d5
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseMatchOperator;
+
+/**
+ * This operator takes Maps, whose values are numbers, as input tuples.&nbsp;
+ * It then performs a numeric comparison on the values corresponding to one of 
the keys in the input tuple maps.&nbsp;
+ * All tuples processed by the operator before the first successful comparison 
are not output by the operator,
+ * all tuples processed by the operator after and including a successful 
comparison are output by the operator.
+ *
+ * <p>
+ * A compare metric is done on input tuple based on the property "key",
+ * "value", and "cmp" type. All tuples are emitted (inclusive) once a match is 
made.
+ * The comparison is done by getting double value from the Number.
+ * This module is a pass through<br>
+ * <br>
+ * <b> StateFull : Yes, </b> Count is aggregated over application window(s). 
<br>
+ * <b> Partitions : No, </b> will yield wrong result. <br>
+ * <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>allafter</b>: emits Map&lt;K,V extends Number&gt; if compare function
+ * returns true<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq",
+ * "neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Specific compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt",
+ * "gte"<br>
+ * <b>Specific run time checks</b>: None<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit All After Match (Number)
+ * @category Rules and Alerts
+ * @tags filter, compare, numeric, key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class AllAfterMatchMap<K, V extends Number> extends
+    BaseMatchOperator<K, V>
+{
+  /**
+   * The input port on which tuples are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Process HashMap<K,V> and emit all tuples at and after match
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      if (doemit) {
+        allafter.emit(cloneTuple(tuple));
+        return;
+      }
+      V v = tuple.get(getKey());
+      if (v == null) { // error tuple
+        return;
+      }
+      if (compareValue(v.doubleValue())) {
+        doemit = true;
+        allafter.emit(cloneTuple(tuple));
+      }
+    }
+  };
+
+  /**
+   * The output port on which all tuples after a match are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> allafter = new 
DefaultOutputPort<HashMap<K, V>>();
+  boolean doemit = false;
+
+  /**
+   * Resets the matched variable
+   *
+   * @param windowId
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    doemit = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java
new file mode 100644
index 0000000..426c2e5
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+import com.datatorrent.lib.util.UnifierHashMap;
+
+/**
+ * This operator computes and emits distinct key,val pairs (i.e drops 
duplicates).
+ * <p>
+ * Computes and emits distinct key,val pairs (i.e drops duplicates)
+ * </p>
+ * <p>
+ * This is a pass through operator<br>
+ * <br>
+ * This module is same as a "FirstOf" metric on any key,val pair. At end of 
window all data is flushed.<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : Yes, </b> distinct output is unified by unifier hash map 
operator. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: Input data port expects Map&lt;K,V&gt;<br>
+ * <b>distinct</b>: Output data port, emits HashMap&lt;K,V&gt;(1)<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Distinct Key Value Merge
+ * @category Stream Manipulators
+ * @tags filter, unique, key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+
+@Deprecated
+@OperatorAnnotation(partitionable = true)
+public class DistinctMap<K, V> extends BaseKeyValueOperator<K, V>
+{
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Process HashMap<K,V> tuple on input port data, and emits if match not 
found. Updates the cache
+     * with new key,val pair
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        HashMap<V, Object> vals = mapkeyval.get(e.getKey());
+        if ((vals == null) || !vals.containsKey(e.getValue())) {
+          HashMap<K, V> otuple = new HashMap<K, V>(1);
+          otuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
+          distinct.emit(otuple);
+          if (vals == null) {
+            vals = new HashMap<V, Object>();
+            mapkeyval.put(cloneKey(e.getKey()), vals);
+          }
+          vals.put(cloneValue(e.getValue()), null);
+        }
+      }
+    }
+  };
+
+  /**
+   * The output port on which distinct key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> distinct = new 
DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
+
+
+  protected HashMap<K, HashMap<V, Object>> mapkeyval = new HashMap<K, 
HashMap<V, Object>>();
+
+  /**
+   * Clears the cache/hash
+   */
+  @Override
+  public void endWindow()
+  {
+    mapkeyval.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java
new file mode 100644
index 0000000..9925d69
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.util.BaseKeyOperator;
+
+/**
+ * This operator filters the incoming stream of tuples using a set of 
specified key value pairs.&nbsp;
+ * Tuples that match the filter are emitted by the operator.
+ * <p>
+ * Filters the incoming stream based of specified key,val pairs, and emits 
those that match the filter. If
+ * property "inverse" is set to "true", then all key,val pairs except those 
specified by in keyvals parameter are emitted
+ * </p>
+ * <p>
+ * Operator assumes that the key, val pairs are immutable objects. If this 
operator has to be used for mutable objects,
+ * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of 
V.<br>
+ * This is a pass through node<br>
+ * <br>
+ * <b>StateFull : No, </b> tuple are processed in current window. <br>
+ * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects HashMap&lt;K,V&gt;<br>
+ * <b>filter</b>: emits HashMap&lt;K,V&gt;(1)<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>keyvals</b>: The keyvals is key,val pairs to pass through, rest are 
filtered/dropped.<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Filter Keyval Pairs
+ * @category Rules and Alerts
+ * @tags filter, key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+@OperatorAnnotation(partitionable = true)
+public class FilterKeyVals<K,V> extends BaseKeyOperator<K>
+{
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<HashMap<K, V>> data = new 
DefaultInputPort<HashMap<K, V>>()
+  {
+    /**
+     * Processes incoming tuples one key,val at a time. Emits if at least one 
key makes the cut
+     * By setting inverse as true, match is changed to un-matched
+     */
+    @Override
+    public void process(HashMap<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        entry.clear();
+        entry.put(e.getKey(),e.getValue());
+        boolean contains = keyvals.containsKey(entry);
+        if ((contains && !inverse) || (!contains && inverse)) {
+          HashMap<K, V> dtuple = new HashMap<K,V>(1);
+          dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
+          filter.emit(dtuple);
+        }
+      }
+    }
+  };
+
+  /**
+   * The output port on which filtered key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> filter = new 
DefaultOutputPort<HashMap<K, V>>();
+
+  @NotNull()
+  HashMap<HashMap<K,V>,Object> keyvals = new HashMap<HashMap<K,V>,Object>();
+  boolean inverse = false;
+  private transient HashMap<K,V> entry = new HashMap<K,V>(1);
+
+  /**
+   * Gets the inverse property.
+   * @return inverse
+   */
+  public boolean getInverse()
+  {
+    return inverse;
+  }
+
+  /**
+   * If true then only matches are emitted. If false then only non matches are 
emitted.
+   * @param val
+   */
+  public void setInverse(boolean val)
+  {
+    inverse = val;
+  }
+
+  /**
+   * True means match; False means unmatched
+   * @return keyvals hash
+   */
+  @NotNull()
+  public HashMap<HashMap<K,V>,Object> getKeyVals()
+  {
+    return keyvals;
+  }
+
+  /**
+   * Adds a key to the filter list
+   * @param map with key,val pairs to set as filters
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void setKeyVals(HashMap<K,V> map)
+  {
+    for (Map.Entry<K, V> e: map.entrySet()) {
+      HashMap kvpair = new HashMap<K,V>(1);
+      kvpair.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
+      keyvals.put(kvpair, null);
+    }
+  }
+
+  /*
+   * Clears the filter list
+   */
+  public void clearKeys()
+  {
+    keyvals.clear();
+  }
+
+  /**
+   * Clones V object. By default assumes immutable object (i.e. a copy is not 
made). If object is mutable, override this method
+   * @param v value to be cloned
+   * @return returns v as is (assumes immutable object)
+   */
+  public V cloneValue(V v)
+  {
+    return v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java
new file mode 100644
index 0000000..cfee74c
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.util.BaseKeyOperator;
+
+
+/**
+ * This operator filters the incoming stream of key value pairs based on the 
keys specified by property "keys".
+ * <p>
+ * Filters the incoming stream based of keys specified by property "keys". If
+ * property "inverse" is set to "true", then all keys except those specified 
by "keys" are emitted
+ * </p>
+ * <p>
+ * Operator assumes that the key, val pairs are immutable objects. If this 
operator has to be used for mutable objects,
+ * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of 
V.<br>
+ * This is a pass through node.<br>
+ * <br>
+ * <b>StateFull : No, </b> tuple are processed in current window. <br>
+ * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: Expects Map&lt;K, HashMap&lt;K,V&gt;&gt. Filters are applied 
only on keys of second hash map.<br>
+ * <b>filter</b>: Emits HashMap&lt;K, HashMap&lt;K,V&gt;&gt.<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma 
separated list of keys.<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Filter Keyval Pairs By Key HashMap
+ * @category Stream Manipulators
+ * @tags filter, key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+@OperatorAnnotation(partitionable = true)
+public class FilterKeysHashMap<K, V> extends BaseKeyOperator<K>
+{
+  /**
+   * Filter keys map.
+   */
+  @NotNull()
+  HashMap<K, V> keys = new HashMap<K, V>();
+
+  /**
+   * Emits key not in filter map.
+   */
+  boolean inverse = false;
+
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, HashMap<K, V>>> data = new 
DefaultInputPort<Map<K, HashMap<K, V>>>()
+  {
+    /**
+     * Processes incoming tuples one key,val at a time. Emits if at least one 
key makes the cut.
+     * By setting inverse as true, match is changed to un-matched.
+     */
+    @Override
+    public void process(Map<K, HashMap<K, V>> tuple)
+    {
+      HashMap<K, HashMap<K, V>> dtuple = null;
+      for (Map.Entry<K, HashMap<K, V>> e: tuple.entrySet()) {
+        HashMap<K, V> dtuple2 = null;
+        for (Map.Entry<K, V> e2: e.getValue().entrySet()) {
+          boolean contains = keys.containsKey(e2.getKey());
+          if ((contains && !inverse) || (!contains && inverse)) {
+            if (dtuple2 == null) {
+              dtuple2 = new HashMap<K, V>(4); // usually the filter keys are 
very few, so 4 is just fine
+            }
+            dtuple2.put(cloneKey(e2.getKey()), cloneValue(e2.getValue()));
+          }
+        }
+        if (dtuple == null && dtuple2 != null) {
+          dtuple = new HashMap<K, HashMap<K, V>>();
+        }
+        if (dtuple != null && dtuple2 != null) {
+          dtuple.put(cloneKey(e.getKey()), dtuple2);
+        }
+      }
+      if (dtuple != null) {
+        filter.emit(dtuple);
+      }
+    }
+  };
+
+  /**
+   * The output port on which filtered key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, HashMap<K, V>>> filter = 
new DefaultOutputPort<HashMap<K, HashMap<K, V>>>();
+
+  /**
+   * getter function for parameter inverse
+   *
+   * @return inverse
+   */
+  public boolean getInverse()
+  {
+    return inverse;
+  }
+
+  /**
+   * True means match; False means unmatched
+   *
+   * @param val
+   */
+  public void setInverse(boolean val)
+  {
+    inverse = val;
+  }
+
+  /**
+   * Adds a key to the filter list
+   *
+   * @param str
+   */
+  public void setKey(K str)
+  {
+    keys.put(str, null);
+  }
+
+  /**
+   * Adds the list of keys to the filter list
+   *
+   * @param list
+   */
+  public void setKeys(K[] list)
+  {
+    if (list != null) {
+      for (K e: list) {
+        keys.put(e, null);
+      }
+    }
+  }
+
+  /*
+   * Clears the filter list
+   */
+  public void clearKeys()
+  {
+    keys.clear();
+  }
+
+  /**
+   * Clones V object. By default assumes immutable object (i.e. a copy is not 
made). If object is mutable, override this method
+   *
+   * @param v value to be cloned
+   * @return returns v as is (assumes immutable object)
+   */
+  public V cloneValue(V v)
+  {
+    return v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java
new file mode 100644
index 0000000..43386f0
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.util.BaseKeyOperator;
+import com.datatorrent.lib.util.UnifierHashMap;
+
+/**
+ * This operator filters the incoming stream of key value pairs based on the 
keys specified by property "keys"..
+ * <p>
+ * Filters the incoming stream based of keys specified by property "keys". If
+ * property "inverse" is set to "true", then all keys except those specified 
by "keys" are emitted
+ * </p>
+ * <p>
+ * Operator assumes that the key, val pairs are immutable objects. If this 
operator has to be used for mutable objects,
+ * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of 
V.<br>
+ * This is a pass through node<br>
+ * <br>
+ * <b>StateFull : No, </b> tuple are processed in current window. <br>
+ * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: Expects Map&lt;K,V&gt;<br>
+ * <b>filter</b>: Emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma 
separated list of keys<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Filter Keyval Pairs By Key Generic
+ * @category Rules and Alerts
+ * @tags filter, key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@Stateless
+@OperatorAnnotation(partitionable = true)
+public class FilterKeysMap<K,V> extends BaseKeyOperator<K>
+{
+  /**
+   * Filter keys map.
+   */
+  @NotNull()
+  HashMap<K, V> keys = new HashMap<K, V>();
+
+  /**
+   * Emits key not in filter map.
+   */
+  boolean inverse = false;
+
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Processes incoming tuples one key,val at a time. Emits if at least one 
key makes the cut
+     * By setting inverse as true, match is changed to un-matched
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      HashMap<K, V> dtuple = null;
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        boolean contains = keys.containsKey(e.getKey());
+        if ((contains && !inverse) || (!contains && inverse)) {
+          if (dtuple == null) {
+            dtuple = new HashMap<K, V>(4); // usually the filter keys are very 
few, so 4 is just fine
+          }
+          dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
+        }
+      }
+      if (dtuple != null) {
+        filter.emit(dtuple);
+      }
+    }
+  };
+
+  /**
+   * The output port on which filtered key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> filter = new 
DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
+
+  /**
+   * If true then only matches are emitted. If false then only non matches are 
emitted.
+   * @return inverse
+   */
+  public boolean getInverse()
+  {
+    return inverse;
+  }
+
+
+  /**
+   * Sets the inverse property. If true then only matches are emitted. If 
false then only non matches are emitted.
+   * @param val
+   */
+  public void setInverse(boolean val)
+  {
+    inverse = val;
+  }
+
+  /**
+   * Adds a key to the filter list
+   * @param str
+   */
+  public void setKey(K str)
+  {
+    keys.put(str, null);
+  }
+
+  /**
+   * Adds the list of keys to the filter list
+   * @param list
+   */
+  public void setKeys(K[] list)
+  {
+    if (list != null) {
+      for (K e: list) {
+        keys.put(e, null);
+      }
+    }
+  }
+
+  /**
+   * The keys to filter. The values in the map should be null.
+   * @param keys
+   */
+  public void setKeys(HashMap<K, V> keys)
+  {
+    this.keys = keys;
+  }
+
+  /**
+   * Gets the keys to filter.
+   * @return Returns a map containing the keys.
+   */
+  public HashMap<K, V> getKeys()
+  {
+    return keys;
+  }
+
+  /*
+   * Clears the filter list
+   */
+  public void clearKeys()
+  {
+    keys.clear();
+  }
+
+  /**
+   * Clones V object. By default assumes immutable object (i.e. a copy is not 
made). If object is mutable, override this method
+   * @param v value to be cloned
+   * @return returns v as is (assumes immutable object)
+   */
+  public V cloneValue(V v)
+  {
+    return v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java
new file mode 100644
index 0000000..7649706
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseMatchOperator;
+
+/**
+ * This operator filters the incoming stream of key value pairs by obtaining 
the values corresponding to a specified key,
+ * and comparing those values to a specified number.&nbsp;The first key value 
pair, in each window, to satisfy the comparison is emitted.
+ * <p>
+ * A compare metric on a Number tuple based on the property "key", "value", 
and "cmp"; the first match is emitted.
+ *  The comparison is done by getting double value from the Number.
+ * </p>
+ * <p>
+ * This module is a pass through<br>
+ * The operators by default assumes immutable keys. If the key is mutable, use 
cloneKey to make a copy<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are processed in current window. <br>
+ * <b>Partitions : No, </b>will yield wrong results. <br>
+ * <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>first</b>: emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", 
"neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Specific compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", 
"gt", "gte"<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit First Match (Number)
+ * @category Rules and Alerts
+ * @tags filter, key value, numeric
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class FirstMatchMap<K, V extends Number> extends BaseMatchOperator<K,V>
+{
+  /**
+   * Tuple emitted flag.
+   */
+  boolean emitted = false;
+
+  /**
+   * The port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Checks if required key,val pair exists in the HashMap. If so tuple is 
emitted, and emitted flag is set
+     * to true
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      if (emitted) {
+        return;
+      }
+      V val = tuple.get(getKey());
+      if (val == null) { // skip if key does not exist
+        return;
+      }
+      if (compareValue(val.doubleValue())) {
+        first.emit(cloneTuple(tuple));
+        emitted = true;
+      }
+    }
+  };
+
+  /**
+   * The output port on which the first satisfying key value pair is emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> first = new 
DefaultOutputPort<HashMap<K, V>>();
+
+  /**
+   * Resets emitted flag to false
+   * @param windowId
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    emitted = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java
new file mode 100644
index 0000000..f067fbb
--- /dev/null
+++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableInt;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.AbstractBaseNOperatorMap;
+
+/**
+ * This operator filters the incoming stream of key value pairs by emitting 
the first N key value pairs with a specified key in each window.
+ * <p>
+ * Emits first N tuples of a particular key.
+ * </p>
+ * <p>
+ * This module is a pass through module<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : No, </b> will yield wrong results. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: Input data port expects HashMap&lt;K,V&gt;<br>
+ * <b>bottom</b>: Output data port, emits HashMap&lt;K,V&gt;<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>N</b>: The number of top values to be emitted per key<br>
+ * <br>
+ * <b>Specific compile time checks are</b>:<br>
+ * N: Has to be >= 1<br>
+ * <br>
+ * <br>
+ * </p>
+ *
+ * @displayName First N Keyval Pairs Matching Key
+ * @category Rules and Alerts
+ * @tags filter, key value
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class FirstN<K,V> extends AbstractBaseNOperatorMap<K, V>
+{
+  /**
+   * key count map.
+   */
+  HashMap<K, MutableInt> keycount = new HashMap<K, MutableInt>();
+
+  /**
+   * Inserts tuples into the queue
+   * @param tuple to insert in the queue
+   */
+  @Override
+  public void processTuple(Map<K, V> tuple)
+  {
+    for (Map.Entry<K, V> e: tuple.entrySet()) {
+      MutableInt count = keycount.get(e.getKey());
+      if (count == null) {
+        count = new MutableInt(0);
+        keycount.put(e.getKey(), count);
+      }
+      count.increment();
+      if (count.intValue() <= getN()) {
+        first.emit(cloneTuple(e.getKey(), e.getValue()));
+      }
+    }
+  }
+
+  /**
+   * The output port on which the first N key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> first = new 
DefaultOutputPort<HashMap<K, V>>();
+
+  /**
+   * Clears the cache to start anew in a new window
+   */
+  @Override
+  public void endWindow()
+  {
+    keycount.clear();
+  }
+
+  /**
+   * First N number of KeyValue pairs for each Key.
+   *
+   * @param val
+   */
+  public void setN(int val)
+  {
+    super.setN(val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java
new file mode 100644
index 0000000..c32a0f2
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseMatchOperator;
+
+/**
+ * This operator filters the incoming stream of key value pairs by obtaining 
the values corresponding to a specified key,
+ * and comparing those values to a specified number.&nbsp;For each window, all 
key value pairs are emitted by the operator until a value satisfying the 
comparison is encountered.
+ * <p>
+ * All key.val pairs with val sub-classed from Number are emitted till the 
first match;  A compare metric is done based on the property "key",
+ * "value", and "cmp". Then on no tuple is emitted in that window. The 
comparison is done by getting double value of the Number.
+ * </p>
+ * <p>
+ * This module is a pass through<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are processed in current window. <br>
+ * <b>Partitions : No, </b>will yield wrong results. <br>
+ * <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: Input port, expects HashMap&lt;K,V&gt;<br>
+ * <b>first</b>: Output port, emits HashMap&lt;K,V&gt; if compare function 
returns true<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", 
"neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Specific compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", 
"gt", "gte"<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Keyval Pairs Until Match (Number)
+ * @category Rules and Alerts
+ * @tags filter, key value, numeric
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class FirstTillMatch<K, V extends Number> extends BaseMatchOperator<K, 
V>
+{
+  /**
+   * Tuple emitted flag.
+   */
+  boolean emitted = false;
+
+  /**
+   * The input port on which incoming key value pairs are received.
+   */
+  public final transient DefaultInputPort<HashMap<K, V>> data = new 
DefaultInputPort<HashMap<K, V>>()
+  {
+    /**
+     * Compares the key,val pair with the match condition. Till a match is 
found tuples are emitted.
+     * Once a match is found, state is set to emitted, and no more tuples are 
compared (no more emits).
+     */
+    @Override
+    public void process(HashMap<K, V> tuple)
+    {
+      if (emitted) {
+        return;
+      }
+      V val = tuple.get(getKey());
+      if (val == null) { // skip if the key does not exist
+        return;
+      }
+      if (compareValue(val.doubleValue())) {
+        emitted = true;
+      }
+      if (!emitted) {
+        first.emit(cloneTuple(tuple));
+      }
+    }
+  };
+
+  /**
+   * The output port on which key value pairs are emitted until the first 
match.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> first = new 
DefaultOutputPort<HashMap<K, V>>();
+
+  /**
+   * Emitted set is reset to false
+   * @param windowId
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    emitted = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java
new file mode 100644
index 0000000..4af9091
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.PriorityQueue;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.AbstractBaseSortOperator;
+import com.datatorrent.lib.util.ReversibleComparator;
+
+/**
+ * This operator takes the values it receives each window and outputs them in 
ascending order at the end of each window.
+ * <p>
+ * Incoming tuple is inserted into already existing sorted list in a 
descending order. At the end of the window the resultant sorted list is emitted 
on the output ports.
+ * </p>
+ * <p>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : No, </b> will yield wrong results. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects K<br>
+ * <b>datalist</b>: expects ArrayList&lt;K&gt;<br>
+ * <b>sortlist</b>: emits ArrayList&lt;K&gt;<br>
+ * <b>sorthash</b>: emits HashMap&lt;K,Integer&gt;<br>
+ * <br>
+ * <br>
+ * </p>
+ * @displayName Sort Descending
+ * @category Stream Manipulators
+ * @tags rank, sort
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+//
+// TODO: Override PriorityQueue and rewrite addAll to insert with location
+//
+@OperatorAnnotation(partitionable = false)
+public class InsertSortDesc<K> extends AbstractBaseSortOperator<K>
+{
+  /**
+   * The input port on which individual tuples are received for sorting.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+  {
+    /**
+     * Adds tuple to sorted queue
+     */
+    @Override
+    public void process(K tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+  /**
+   * The input port on which lists of tuples are received for sorting.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ArrayList<K>> datalist = new 
DefaultInputPort<ArrayList<K>>()
+  {
+    /**
+     * Adds tuples to sorted queue
+     */
+    @Override
+    public void process(ArrayList<K> tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * The output port on which a sorted descending list of tuples is emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ArrayList<K>> sort = new 
DefaultOutputPort<ArrayList<K>>();
+  @OutputPortFieldAnnotation(optional = true)
+  /**
+   * This output port emits a map from tuples to a count of the number of 
times each tuple occurred in the application window.
+   */
+  public final transient DefaultOutputPort<HashMap<K, Integer>> sorthash = new 
DefaultOutputPort<HashMap<K, Integer>>();
+
+  @Override
+  public void initializeQueue()
+  {
+    pqueue = new PriorityQueue<K>(getSize(), new 
ReversibleComparator<K>(false));
+  }
+
+
+  @Override
+  public void emitToList(ArrayList<K> list)
+  {
+    sort.emit(list);
+  }
+
+  @Override
+  public void emitToHash(HashMap<K,Integer> map)
+  {
+    sorthash.emit(map);
+  }
+
+  @Override
+  public boolean doEmitList()
+  {
+    return sort.isConnected();
+  }
+
+  @Override
+  public boolean doEmitHash()
+  {
+    return sorthash.isConnected();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java
new file mode 100644
index 0000000..7964ed7
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.Unifier;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+
+/**
+ * This operator takes a stream of key value pairs each window,
+ * and outputs a set of inverted key value pairs at the end of each window.
+ * <p>
+ * Inverts the index and sends out the tuple on output port "index" at the end 
of the window.
+ * </p>
+ * <p>
+ * This is an end of window operator<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same 
operator. <br>
+ * <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects &lt;K,V&gt;<br>
+ * <b>index</b>: emits &lt;V,ArrayList&lt;K&gt;&gt;(1); one HashMap per V<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Invert Key Value Pairs
+ * @category Stream Manipulators
+ * @tags key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = true)
+public class InvertIndex<K, V> extends BaseKeyValueOperator<K, V> implements 
Unifier<HashMap<V, ArrayList<K>>>
+{
+  /**
+   * Inverted key/value map.
+   */
+  protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>();
+
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<HashMap<K, V>> data = new 
DefaultInputPort<HashMap<K, V>>()
+  {
+    /**
+     * Reverse indexes a HashMap<K, ArrayList<V>> tuple
+     */
+    @Override
+    public void process(HashMap<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        if (e.getValue() == null) { // error tuple?
+          continue;
+        }
+        insert(e.getValue(), cloneKey(e.getKey()));
+      }
+    }
+  };
+
+  /**
+   * The output port on which inverted key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = 
new DefaultOutputPort<HashMap<V, ArrayList<K>>>()
+  {
+    @Override
+    public Unifier<HashMap<V, ArrayList<K>>> getUnifier()
+    {
+      return new InvertIndex<K, V>();
+    }
+  };
+
+  /**
+   *
+   * Returns the ArrayList stored for a key
+   *
+   * @param key
+   */
+  void insert(V val, K key)
+  {
+    ArrayList<K> list = map.get(val);
+    if (list == null) {
+      list = new ArrayList<K>(4);
+      map.put(cloneValue(val), list);
+    }
+    list.add(key);
+  }
+
+  /**
+   * Emit all the data and clear the hash
+   * Clears internal data
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) {
+      HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1);
+      tuple.put(e.getKey(), e.getValue());
+      index.emit(tuple);
+    }
+    map.clear();
+  }
+
+  /**
+   * Unifier override.
+   */
+  @Override
+  public void process(HashMap<V, ArrayList<K>> tuple)
+  {
+    for (Map.Entry<V, ArrayList<K>> e: tuple.entrySet()) {
+      ArrayList<K> keys;
+      if (map.containsKey(e.getKey())) {
+        keys = map.remove(e.getKey());
+      } else {
+        keys = new ArrayList<K>();
+      }
+      keys.addAll(e.getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java
new file mode 100644
index 0000000..26b77ac
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseKeyValueOperator;
+
+/**
+ * This operator takes a stream of key value pairs each window,
+ * and outputs a set of inverted key value pairs at the end of each 
window.&nbsp;
+ * The values in the key value pairs received by this operator are an array 
lists, which may multiple values.
+ * <p>
+ * Inverts the index and sends out the tuple on output port "index" at the end 
of the window.
+ * </p>
+ * <p>
+ * This is an end of window operator<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same 
operator. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects HashMap&lt;K,ArrayList&lt;V&gt;&gt;<br>
+ * <b>index</b>: emits HashMap&lt;V,ArrayList&lt;K&gt;&gt;(1), one HashMap per 
V<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Invert Key Value Pairs (Array)
+ * @category Stream Manipulators
+ * @tags key value
+ *
+ * @since 0.3.2
+ * @deprecated
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = true)
+public class InvertIndexArray<K, V> extends BaseKeyValueOperator<K,V>
+{
+  /**
+   * Inverted key/value map.
+   */
+  protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>();
+
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<HashMap<K, ArrayList<V>>> data = new 
DefaultInputPort<HashMap<K, ArrayList<V>>>()
+  {
+    /**
+     * Reverse indexes a HashMap<K, ArrayList<V>> tuple
+     */
+    @Override
+    public void process(HashMap<K, ArrayList<V>> tuple)
+    {
+      for (Map.Entry<K, ArrayList<V>> e: tuple.entrySet()) {
+        ArrayList<V> alist = e.getValue();
+        if (alist == null) { // error tuple?
+          continue;
+        }
+        for (V val : alist) {
+          insert(val, cloneKey(e.getKey()));
+        }
+      }
+    }
+  };
+
+  /**
+   * The output port or which inverted key value pairs are emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = 
new DefaultOutputPort<HashMap<V, ArrayList<K>>>()
+  {
+    @Override
+    public Unifier<HashMap<V, ArrayList<K>>> getUnifier()
+    {
+      return new InvertIndex<K, V>();
+    }
+  };
+
+  /**
+   *
+   * Returns the ArrayList stored for a key
+   *
+   * @param key
+   */
+  void insert(V val, K key)
+  {
+    ArrayList<K> list = map.get(val);
+    if (list == null) {
+      list = new ArrayList<K>(4);
+      map.put(cloneValue(val), list);
+    }
+    list.add(key);
+  }
+
+  /**
+   * Emit all the data and clear the hash
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) {
+      HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1);
+      tuple.put(e.getKey(), e.getValue());
+      index.emit(tuple);
+    }
+    map.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java
new file mode 100644
index 0000000..188c9b1
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+
+import com.datatorrent.lib.util.BaseMatchOperator;
+
+/**
+ * This operator filters the incoming stream of key value pairs by obtaining 
the values corresponding to a specified key,
+ * and comparing those values to a specified value.&nbsp;The last key value 
pair, in each window, to satisfy the comparison is emitted.
+ * <p>
+ * A compare function is  operated on a tuple value sub-classed from Number 
based on the property "key", "value", and "cmp". Every tuple
+ * is checked and the last one that passes the condition is send during end of 
window on port "last". The comparison is done by getting double
+ * value from the Number.
+ * </p>
+ * <p>
+ * This module is an end of window module<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
+ * <b>Partitions : No, </b> will yield wrong result. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
+ * <b>last</b>: emits Map&lt;K,V&gt; in end of window for the last tuple on 
which the compare function is true<br>
+ * <br>
+ * <b>Properties</b>:<br>
+ * <b>key</b>: The key on which compare is done<br>
+ * <b>value</b>: The value to compare with<br>
+ * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", 
"neq", "gt", "gte". Default is "eq"<br>
+ * <br>
+ * <b>Specific compile time checks</b>:<br>
+ * Key must be non empty<br>
+ * Value must be able to convert to a "double"<br>
+ * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", 
"gt", "gte"<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Last Match (Number)
+ * @category Rules and Alerts
+ * @tags filter, key value, numeric
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = false)
+public class LastMatchMap<K, V extends Number> extends BaseMatchOperator<K,V>
+{
+  /**
+   * Last tuple.
+   */
+  protected HashMap<K, V> ltuple = null;
+
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Processes tuples and keeps a copy of last matched tuple
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      V val = tuple.get(getKey());
+      if (val == null) {
+        return;
+      }
+      if (compareValue(val.doubleValue())) {
+        ltuple = cloneTuple(tuple);
+      }
+    }
+  };
+
+  /**
+   * The output port on which the last key value pair to satisfy the 
comparison function is emitted.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> last = new 
DefaultOutputPort<HashMap<K, V>>();
+
+  /**
+   * Emits last matching tuple
+   */
+  @Override
+  public void endWindow()
+  {
+    if (ltuple != null) {
+      last.emit(ltuple);
+    }
+    ltuple = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java
new file mode 100644
index 0000000..af5229c
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.AbstractBaseFrequentKey;
+import com.datatorrent.lib.util.UnifierArrayHashMapFrequent;
+import com.datatorrent.lib.util.UnifierHashMapFrequent;
+
+/**
+ * This operator filters the incoming stream of key value pairs by finding the 
key or keys (if there is a tie) that occur the fewest number of times within 
each window.&nbsp;
+ * A list of the corresponding key value pairs are then output to the port 
named "list" and one of the corresponding key value pairs is output to the port 
"least", at the end of each window.
+ * <p>
+ * Occurrences of each key is counted and at the end of window any of the 
least frequent key is emitted on output port least and all least frequent
+ * keys on output port list.
+ * </p>
+ * <p>
+ * This module is an end of window module. In case of a tie any of the least 
key would be emitted. The list port would however have all the tied keys<br>
+ * <br>
+ * <b>StateFull : Yes, </b> tuple are compared across application window(s). 
<br>
+ * <b>Partitions : Yes, </b> least keys are unified on output port. <br>
+ * <br>
+ * <b>Ports</b>:<br>
+ * <b>data</b>: expects Map&lt;K,V&gt;, V is ignored/not used<br>
+ * <b>least</b>: emits HashMap&lt;K,Integer&gt;(1); where String is the least 
frequent key, and Integer is the number of its occurrences in the window<br>
+ * <b>list</b>: emits ArrayList&lt;HashMap&lt;K,Integer&gt;(1)&gt;; Where the 
list includes all the keys are least frequent<br>
+ * <br>
+ * </p>
+ *
+ * @displayName Emit Least Frequent Tuple Key
+ * @category Rules and Alerts
+ * @tags filter, key value, count
+ * @deprecated
+ * @since 0.3.2
+ */
+@Deprecated
+@OperatorAnnotation(partitionable = true)
+public class LeastFrequentKeyMap<K, V> extends AbstractBaseFrequentKey<K>
+{
+  /**
+   * The input port on which key value pairs are received.
+   */
+  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
+  {
+    /**
+     * Calls super.processTuple(tuple) for each key in the HashMap
+     */
+    @Override
+    public void process(Map<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e: tuple.entrySet()) {
+        processTuple(e.getKey());
+      }
+    }
+  };
+
+  /**
+   * The output port on which one of the tuples,
+   * which occurred the least number of times,
+   * is emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<HashMap<K, Integer>> least = new 
DefaultOutputPort<HashMap<K, Integer>>()
+  {
+    @Override
+    public Unifier<HashMap<K, Integer>> getUnifier()
+    {
+      Unifier<HashMap<K, Integer>> ret = new UnifierHashMapFrequent<K>();
+      ((UnifierHashMapFrequent<K>)ret).setLeast(true);
+      return ret;
+    }
+  };
+
+  /**
+   * The output port on which all the tuples,
+   * which occurred the least number of times,
+   * is emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ArrayList<HashMap<K, Integer>>> 
list = new DefaultOutputPort<ArrayList<HashMap<K, Integer>>>()
+  {
+    @Override
+    public Unifier<ArrayList<HashMap<K, Integer>>> getUnifier()
+    {
+      Unifier<ArrayList<HashMap<K, Integer>>> ret = new 
UnifierArrayHashMapFrequent<K>();
+      ((UnifierArrayHashMapFrequent<K>)ret).setLeast(true);
+      return ret;
+    }
+  };
+
+  /**
+   * Emits tuple on port "least"
+   *
+   * @param tuple
+   */
+  @Override
+  public void emitTuple(HashMap<K, Integer> tuple)
+  {
+    least.emit(tuple);
+  }
+
+  /**
+   * Emits tuple on port "list"
+   *
+   * @param tlist
+   */
+  @Override
+  public void emitList(ArrayList<HashMap<K, Integer>> tlist)
+  {
+    list.emit(tlist);
+  }
+
+  /**
+   * returns val1 < val2
+   *
+   * @param val1
+   * @param val2
+   * @return val1 < val2
+   */
+  @Override
+  public boolean compareCount(int val1, int val2)
+  {
+    return val1 < val2;
+  }
+}

Reply via email to