http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
new file mode 100644
index 0000000..6ad818e
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.SumFunction;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.GroupByOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class GroupByOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlGroupBy()
+  {
+    // create operator
+    GroupByHavingOperator oper = new GroupByHavingOperator();
+    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+    try {
+      oper.addAggregateIndex(new SumFunction("c", null));
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return;
+    }
+
+    EqualValueCondition condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 7);
+    oper.inport.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GroupByOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
new file mode 100644
index 0000000..5b696f1
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCompareValue;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.HavingCondition;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.FunctionIndex;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.SumFunction;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.HavingOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class HavingOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlGroupBy() throws Exception
+  {
+    // create operator
+    GroupByHavingOperator oper = new GroupByHavingOperator();
+    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
+    FunctionIndex sum = new SumFunction("c", null);
+    oper.addAggregateIndex(sum);
+
+    // create having condition
+    HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0);
+    oper.addHavingCondition(having);
+
+    EqualValueCondition condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 1);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 2);
+    tuple.put("c", 7);
+    oper.inport.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HavingOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
new file mode 100644
index 0000000..8b4f923
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * 
+ * Functional test for {@link 
com.datatorrent.lib.streamquery.InnerJoinOperator }.
+ * @deprecated
+ */
+@Deprecated
+public class InnerJoinOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    InnerJoinOperator oper = new InnerJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    // set column join condition
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+
+    // add columns
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable2Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InnerJoinOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
new file mode 100644
index 0000000..f78ba21
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@Deprecated
+public class LeftOuterJoinOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    // set column join condition  
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+    
+    // add columns  
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable2Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 11);
+    tuple.put("c", 12);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java
new file mode 100644
index 0000000..88aa2d0
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/OrderByOperatorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ *  Functional test for {@link 
com.datatorrent.lib.streamquery.OrderByOperatorTest}.
+ *  @deprecated
+ */
+@Deprecated
+public class OrderByOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // craete operator
+    OrderByOperator oper = new OrderByOperator();
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+    oper.addOrderByRule(new OrderByRule<Integer>("b"));
+    oper.setDescending(true);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("c", 2);
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 6);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 4);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 8);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OrderByOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
new file mode 100644
index 0000000..8142276
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@Deprecated
+public class RightOuterJoinOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    oper.setRighttJoin();
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    // set column join condition  
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+    
+    // add columns  
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable2Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 11);
+    tuple.put("c", 12);
+    oper.inport2.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RightOuterJoinOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java
new file mode 100644
index 0000000..c5acbdd
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectOperatorTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SelectOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    EqualValueCondition condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
new file mode 100644
index 0000000..90480cf
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@Deprecated
+public class SelectTopOperatorTest
+{
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test
+  public void testOperator() throws Exception
+  {
+    SelectTopOperator oper = new SelectTopOperator();
+    oper.setTopValue(2);
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+    
+    oper.beginWindow(1);
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+    
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+    
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+    oper.endWindow();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectTopOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java
new file mode 100644
index 0000000..f480cc7
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/UpdateOperatorTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+@Deprecated
+public class UpdateOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    UpdateOperator oper = new UpdateOperator();
+
+    EqualValueCondition condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+    oper.addUpdate("c", 100);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(UpdateOperatorTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
new file mode 100644
index 0000000..01465db
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.BetweenCondition;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.advanced.BetweenConditionTest}.
+ * @deprecated
+ */
+@Deprecated
+public class BetweenConditionTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    BetweenCondition cond = new BetweenCondition("a", 0, 2);
+    oper.setCondition(cond);
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 3);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BetweenConditionTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
new file mode 100644
index 0000000..e160e5d
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.CompoundCondition;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.advanced.CompoundConditionTest}.
+ * @deprecated
+ */
+@Deprecated
+public class CompoundConditionTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    EqualValueCondition left = new EqualValueCondition();
+    left.addEqualValue("a", 1);
+    EqualValueCondition  right = new EqualValueCondition();
+    right.addEqualValue("b", 1);
+
+    oper.setCondition(new CompoundCondition(left, right));
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 3);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompoundConditionTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
new file mode 100644
index 0000000..d641a1c
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.condition.InCondition;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.advanced.InConditionTest}.
+ * @deprecated
+ */
+@Deprecated
+public class InConditionTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    InCondition cond = new InCondition("a");
+    cond.addInValue(0);
+    cond.addInValue(1);
+    oper.setCondition(cond);
+
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 3);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport.process(tuple);
+    
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InConditionTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java
new file mode 100644
index 0000000..e09d47a
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/LikeConditionTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.condition.LikeCondition;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.advanced.LikeConditionTest}.
+ * @deprecated
+ */
+@Deprecated
+public class LikeConditionTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new ColumnIndex("b", null));
+    oper.addIndex(new ColumnIndex("c", null));
+
+    LikeCondition condition = new LikeCondition("a", "test*");
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", "testing");
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", "null");
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", "testall");
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LikeConditionTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java
new file mode 100644
index 0000000..745c8c9
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/NegateIndexTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.index.NegateExpression;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class NegateIndexTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new NegateExpression("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NegateIndexTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java
new file mode 100644
index 0000000..c1fa8b7
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectAverageTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.function.AverageFunction;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SelectAverageTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new AverageFunction("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectAverageTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java
new file mode 100644
index 0000000..8ac1e33
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectCountTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.CountFunction;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SelectCountTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new CountFunction("b", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", null);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", null);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectCountTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java
new file mode 100644
index 0000000..17341da
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectFirstLastTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator;
+import 
org.apache.apex.malhar.contrib.misc.streamquery.function.FirstLastFunction;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SelectFirstLastTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new FirstLastFunction("b", null, false));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", null);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", null);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectFirstLastTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java
new file mode 100644
index 0000000..fa347c1
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SelectMaxMinTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectFunctionOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.function.MaxMinFunction;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SelectMaxMinTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectFunctionOperator oper = new SelectFunctionOperator();
+    oper.addSqlFunction(new MaxMinFunction("b", null, false));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SelectMaxMinTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java
new file mode 100644
index 0000000..0d374dc
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/SumIndexTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.advanced;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.index.SumExpression;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.SelectOperatorTest}.
+ * @deprecated
+ */
+@Deprecated
+public class SumIndexTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    SelectOperator oper = new SelectOperator();
+    oper.addIndex(new SumExpression("b", "c", null));
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SumIndexTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/demos/yahoofinance/pom.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml
index 12e5eb0..d92cf2a 100644
--- a/demos/yahoofinance/pom.xml
+++ b/demos/yahoofinance/pom.xml
@@ -49,6 +49,17 @@
       <artifactId>derby</artifactId>
       <version>10.9.1.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
index 50b306d..1a38495 100644
--- 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
+++ 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
+import 
org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator;
+import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator;
-import com.datatorrent.lib.streamquery.DerbySqlStreamOperator;
 
 /**
  * This demo will output the stock market data from yahoo finance

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java
 
b/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java
deleted file mode 100644
index 252ea2f..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/algo/AbstractStreamPatternMatcher.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.Iterator;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * <p>
- * This operator searches for a given pattern in the input stream.<br>
- * For e.g. If the pattern is defined as “aa” and your input events arrive 
in following manner “a”, “a”, “a”, then this operator
- * will emit 2 matches for the given pattern. One matching event 1 and 2 and 
other matching 2 and 3.
- * </p>
- *
- * <br>
- * <b> StateFull : Yes, </b> Pattern is found over application window(s). <br>
- * <b> Partitionable : No, </b> will yield wrong result. <br>
- *
- * <br>
- * <b>Ports</b>:<br>
- * <b>inputPort</b>: the port to receive input<br>
- *
- * <br>
- * <b>Properties</b>:<br>
- * <b>pattern</b>: The pattern that needs to be searched<br>
- *
- * @param <T> event type
- *
- * @since 2.0.0
- */
-
-@OperatorAnnotation(partitionable = false)
-public abstract class AbstractStreamPatternMatcher<T> extends BaseOperator
-{
-  /**
-   * The pattern to be searched in the input stream of events
-   */
-  @NotNull
-  private Pattern<T> pattern;
-
-  // this stores the index of the partial matches found so far
-  private List<MutableInt> partialMatches = Lists.newLinkedList();
-  private transient MutableInt patternLength;
-
-  /**
-   * Set the pattern that needs to be searched in the input stream of events
-   *
-   * @param pattern The pattern to be searched
-   */
-  public void setPattern(Pattern<T> pattern)
-  {
-    this.pattern = pattern;
-    partialMatches.clear();
-    patternLength = new MutableInt(pattern.getStates().length - 1);
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-    patternLength = new MutableInt(pattern.getStates().length - 1);
-  }
-
-  /**
-   * Get the pattern that is searched in the input stream of events
-   *
-   * @return Returns the pattern searched
-   */
-  public Pattern<T> getPattern()
-  {
-    return pattern;
-  }
-
-  public transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
-  {
-    @Override
-    public void process(T t)
-    {
-      if (pattern.checkState(t, 0)) {
-        partialMatches.add(new MutableInt(-1));
-      }
-      if (partialMatches.size() > 0) {
-        MutableInt tempInt;
-        Iterator<MutableInt> itr = partialMatches.iterator();
-        while (itr.hasNext()) {
-          tempInt = itr.next();
-          tempInt.increment();
-          if (!pattern.checkState(t, tempInt.intValue())) {
-            itr.remove();
-          } else if (tempInt.equals(patternLength)) {
-            itr.remove();
-            processPatternFound();
-          }
-        }
-      }
-    }
-  };
-
-  /**
-   * This function determines how to process the pattern found
-   */
-  public abstract void processPatternFound();
-
-  public static class Pattern<T>
-  {
-    /**
-     * The states of the pattern
-     */
-    @NotNull
-    private final T[] states;
-
-    //for kryo
-    private Pattern()
-    {
-      states = null;
-    }
-
-    public Pattern(@NotNull T[] states)
-    {
-      this.states = states;
-    }
-
-    /**
-     * Checks if the input state matches the state at index "index" of the 
pattern
-     *
-     * @param t     The input state
-     * @param index The index to match in the pattern
-     * @return True if the state exists at index "index" else false
-     */
-    public boolean checkState(T t, int index)
-    {
-      return states[index].equals(t);
-    }
-
-    /**
-     * Get the states of the pattern
-     *
-     * @return The states of the pattern
-     */
-    public T[] getStates()
-    {
-      return states;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java 
b/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java
deleted file mode 100644
index 82d2201..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/AllAfterMatchMap.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseMatchOperator;
-
-/**
- * This operator takes Maps, whose values are numbers, as input tuples.&nbsp;
- * It then performs a numeric comparison on the values corresponding to one of 
the keys in the input tuple maps.&nbsp;
- * All tuples processed by the operator before the first successful comparison 
are not output by the operator,
- * all tuples processed by the operator after and including a successful 
comparison are output by the operator.
- *
- * <p>
- * A compare metric is done on input tuple based on the property "key",
- * "value", and "cmp" type. All tuples are emitted (inclusive) once a match is 
made.
- * The comparison is done by getting double value from the Number.
- * This module is a pass through<br>
- * <br>
- * <b> StateFull : Yes, </b> Count is aggregated over application window(s). 
<br>
- * <b> Partitions : No, </b> will yield wrong result. <br>
- * <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects Map&lt;K,V extends Number&gt;<br>
- * <b>allafter</b>: emits Map&lt;K,V extends Number&gt; if compare function
- * returns true<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>key</b>: The key on which compare is done<br>
- * <b>value</b>: The value to compare with<br>
- * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq",
- * "neq", "gt", "gte". Default is "eq"<br>
- * <br>
- * <b>Specific compile time checks</b>:<br>
- * Key must be non empty<br>
- * Value must be able to convert to a "double"<br>
- * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt",
- * "gte"<br>
- * <b>Specific run time checks</b>: None<br>
- * <br>
- * </p>
- *
- * @displayName Emit All After Match (Number)
- * @category Rules and Alerts
- * @tags filter, compare, numeric, key value
- *
- * @since 0.3.2
- */
-@OperatorAnnotation(partitionable = false)
-public class AllAfterMatchMap<K, V extends Number> extends
-    BaseMatchOperator<K, V>
-{
-  /**
-   * The input port on which tuples are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Process HashMap<K,V> and emit all tuples at and after match
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      if (doemit) {
-        allafter.emit(cloneTuple(tuple));
-        return;
-      }
-      V v = tuple.get(getKey());
-      if (v == null) { // error tuple
-        return;
-      }
-      if (compareValue(v.doubleValue())) {
-        doemit = true;
-        allafter.emit(cloneTuple(tuple));
-      }
-    }
-  };
-
-  /**
-   * The output port on which all tuples after a match are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> allafter = new 
DefaultOutputPort<HashMap<K, V>>();
-  boolean doemit = false;
-
-  /**
-   * Resets the matched variable
-   *
-   * @param windowId
-   */
-  @Override
-  public void beginWindow(long windowId)
-  {
-    doemit = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java 
b/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java
index ba5e5a1..1989f24 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/CompareExceptCountMap.java
@@ -61,8 +61,9 @@ import com.datatorrent.lib.util.UnifierSumNumber;
  * @tags count, key value
  *
  * @since 0.3.2
+ * @deprecated
  */
-
+@Deprecated
 @OperatorAnnotation(partitionable = true)
 public class CompareExceptCountMap<K, V extends Number> extends MatchMap<K, V>
 {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/Distinct.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/Distinct.java 
b/library/src/main/java/com/datatorrent/lib/algo/Distinct.java
index c694bd2..ce29c50 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/Distinct.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/Distinct.java
@@ -76,10 +76,7 @@ public class Distinct<K> extends BaseKeyOperator<K> 
implements Unifier<K>
     @Override
     public void process(K tuple)
     {
-      if (!map.containsKey(tuple)) {
-        distinct.emit(cloneKey(tuple));
-        map.put(cloneKey(tuple), null);
-      }
+      Distinct.this.process(tuple);
     }
   };
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java 
b/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java
deleted file mode 100644
index e3d658b..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/DistinctMap.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-import com.datatorrent.lib.util.BaseKeyValueOperator;
-import com.datatorrent.lib.util.UnifierHashMap;
-
-/**
- * This operator computes and emits distinct key,val pairs (i.e drops 
duplicates).
- * <p>
- * Computes and emits distinct key,val pairs (i.e drops duplicates)
- * </p>
- * <p>
- * This is a pass through operator<br>
- * <br>
- * This module is same as a "FirstOf" metric on any key,val pair. At end of 
window all data is flushed.<br>
- * <br>
- * <b>StateFull : Yes, </b> tuple are compare across application window(s). 
<br>
- * <b>Partitions : Yes, </b> distinct output is unified by unifier hash map 
operator. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: Input data port expects Map&lt;K,V&gt;<br>
- * <b>distinct</b>: Output data port, emits HashMap&lt;K,V&gt;(1)<br>
- * <br>
- * </p>
- *
- * @displayName Distinct Key Value Merge
- * @category Stream Manipulators
- * @tags filter, unique, key value
- *
- * @since 0.3.2
- */
-
-@OperatorAnnotation(partitionable = true)
-public class DistinctMap<K, V> extends BaseKeyValueOperator<K, V>
-{
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<Map<K, V>> data = new 
DefaultInputPort<Map<K, V>>()
-  {
-    /**
-     * Process HashMap<K,V> tuple on input port data, and emits if match not 
found. Updates the cache
-     * with new key,val pair
-     */
-    @Override
-    public void process(Map<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        HashMap<V, Object> vals = mapkeyval.get(e.getKey());
-        if ((vals == null) || !vals.containsKey(e.getValue())) {
-          HashMap<K, V> otuple = new HashMap<K, V>(1);
-          otuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
-          distinct.emit(otuple);
-          if (vals == null) {
-            vals = new HashMap<V, Object>();
-            mapkeyval.put(cloneKey(e.getKey()), vals);
-          }
-          vals.put(cloneValue(e.getValue()), null);
-        }
-      }
-    }
-  };
-
-  /**
-   * The output port on which distinct key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> distinct = new 
DefaultOutputPort<HashMap<K, V>>()
-  {
-    @Override
-    public Unifier<HashMap<K, V>> getUnifier()
-    {
-      return new UnifierHashMap<K, V>();
-    }
-  };
-
-
-  protected HashMap<K, HashMap<V, Object>> mapkeyval = new HashMap<K, 
HashMap<V, Object>>();
-
-  /**
-   * Clears the cache/hash
-   */
-  @Override
-  public void endWindow()
-  {
-    mapkeyval.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java 
b/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java
deleted file mode 100644
index 41259a7..0000000
--- a/library/src/main/java/com/datatorrent/lib/algo/FilterKeyVals.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.algo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.util.BaseKeyOperator;
-
-/**
- * This operator filters the incoming stream of tuples using a set of 
specified key value pairs.&nbsp;
- * Tuples that match the filter are emitted by the operator.
- * <p>
- * Filters the incoming stream based of specified key,val pairs, and emits 
those that match the filter. If
- * property "inverse" is set to "true", then all key,val pairs except those 
specified by in keyvals parameter are emitted
- * </p>
- * <p>
- * Operator assumes that the key, val pairs are immutable objects. If this 
operator has to be used for mutable objects,
- * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of 
V.<br>
- * This is a pass through node<br>
- * <br>
- * <b>StateFull : No, </b> tuple are processed in current window. <br>
- * <b>Partitions : Yes, </b> no dependency among input tuples. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b>data</b>: expects HashMap&lt;K,V&gt;<br>
- * <b>filter</b>: emits HashMap&lt;K,V&gt;(1)<br>
- * <br>
- * <b>Properties</b>:<br>
- * <b>keyvals</b>: The keyvals is key,val pairs to pass through, rest are 
filtered/dropped.<br>
- * <br>
- * </p>
- *
- * @displayName Filter Keyval Pairs
- * @category Rules and Alerts
- * @tags filter, key value
- *
- * @since 0.3.2
- */
-@Stateless
-@OperatorAnnotation(partitionable = true)
-public class FilterKeyVals<K,V> extends BaseKeyOperator<K>
-{
-  /**
-   * The input port on which key value pairs are received.
-   */
-  public final transient DefaultInputPort<HashMap<K, V>> data = new 
DefaultInputPort<HashMap<K, V>>()
-  {
-    /**
-     * Processes incoming tuples one key,val at a time. Emits if at least one 
key makes the cut
-     * By setting inverse as true, match is changed to un-matched
-     */
-    @Override
-    public void process(HashMap<K, V> tuple)
-    {
-      for (Map.Entry<K, V> e: tuple.entrySet()) {
-        entry.clear();
-        entry.put(e.getKey(),e.getValue());
-        boolean contains = keyvals.containsKey(entry);
-        if ((contains && !inverse) || (!contains && inverse)) {
-          HashMap<K, V> dtuple = new HashMap<K,V>(1);
-          dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
-          filter.emit(dtuple);
-        }
-      }
-    }
-  };
-
-  /**
-   * The output port on which filtered key value pairs are emitted.
-   */
-  public final transient DefaultOutputPort<HashMap<K, V>> filter = new 
DefaultOutputPort<HashMap<K, V>>();
-
-  @NotNull()
-  HashMap<HashMap<K,V>,Object> keyvals = new HashMap<HashMap<K,V>,Object>();
-  boolean inverse = false;
-  private transient HashMap<K,V> entry = new HashMap<K,V>(1);
-
-  /**
-   * Gets the inverse property.
-   * @return inverse
-   */
-  public boolean getInverse()
-  {
-    return inverse;
-  }
-
-  /**
-   * If true then only matches are emitted. If false then only non matches are 
emitted.
-   * @param val
-   */
-  public void setInverse(boolean val)
-  {
-    inverse = val;
-  }
-
-  /**
-   * True means match; False means unmatched
-   * @return keyvals hash
-   */
-  @NotNull()
-  public HashMap<HashMap<K,V>,Object> getKeyVals()
-  {
-    return keyvals;
-  }
-
-  /**
-   * Adds a key to the filter list
-   * @param map with key,val pairs to set as filters
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void setKeyVals(HashMap<K,V> map)
-  {
-    for (Map.Entry<K, V> e: map.entrySet()) {
-      HashMap kvpair = new HashMap<K,V>(1);
-      kvpair.put(cloneKey(e.getKey()), cloneValue(e.getValue()));
-      keyvals.put(kvpair, null);
-    }
-  }
-
-  /*
-   * Clears the filter list
-   */
-  public void clearKeys()
-  {
-    keyvals.clear();
-  }
-
-  /**
-   * Clones V object. By default assumes immutable object (i.e. a copy is not 
made). If object is mutable, override this method
-   * @param v value to be cloned
-   * @return returns v as is (assumes immutable object)
-   */
-  public V cloneValue(V v)
-  {
-    return v;
-  }
-}

Reply via email to