http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
deleted file mode 100644
index 18d9928..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.Unifier;
-
-/**
- *  An implementation of Operator that provides sql order by operator semantic 
over live stream data. <br>
- * <p>
- * Input data rows are ordered by order rules, ordered result is emitted on 
output port. <br>
- * <br>
- *  *  <br>
- *  <b>StateFull : Yes,</b> Operator aggregates input over application window. 
<br>
- *  <b>Partitions : Yes, </b> This operator is also unifier on output port. 
<br>
- *  <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects 
HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits  
HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : </b> <br>
- * <b> orderByRules : </b>List of order by rules for tuples.
- * @displayName OrderBy
- * @category Stream Manipulators
- * @tags orderby operator
- * @since 0.3.5
- */
-public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
-{
-  /**
-   * Order by rules.
-   */
-  ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>();
-
-  /**
-   * Descending flag.
-   */
-  private boolean isDescending;
-
-  /**
-   * collected rows.
-   */
-  private ArrayList<Map<String, Object>> rows;
-
-  /**
-   * Add order by rule.
-   */
-  public void addOrderByRule(OrderByRule<?> rule)
-  {
-    orderByRules.add(rule);
-  }
-
-  /**
-   * @return isDescending
-   */
-  public boolean isDescending()
-  {
-    return isDescending;
-  }
-
-  /**
-   * @param isDescending isDescending
-   */
-  public void setDescending(boolean isDescending)
-  {
-    this.isDescending = isDescending;
-  }
-
-  @Override
-  public void process(Map<String, Object> tuple)
-  {
-    rows.add(tuple);
-  }
-
-  @Override
-  public void beginWindow(long arg0)
-  {
-    rows = new ArrayList<Map<String, Object>>();
-  }
-
-  @Override
-  public void endWindow()
-  {
-    for (int i = 0; i < orderByRules.size(); i++) {
-      rows = orderByRules.get(i).sort(rows);
-    }
-    if (isDescending) {
-      for (int i = 0; i < rows.size(); i++) {
-        outport.emit(rows.get(i));
-      }
-    } else {
-      for (int i = rows.size() - 1; i >= 0; i--) {
-        outport.emit(rows.get(i));
-      }
-    }
-  }
-
-  @Override
-  public void setup(OperatorContext arg0)
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void teardown()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new 
DefaultInputPort<Map<String, Object>>()
-  {
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      rows.add(tuple);
-    }
-  };
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport = new 
DefaultOutputPort<Map<String, Object>>()
-  {
-    @Override
-    public Unifier<Map<String, Object>> getUnifier()
-    {
-      OrderByOperator unifier = new OrderByOperator();
-      for (int i = 0; i < getOrderByRules().size(); i++) {
-        unifier.addOrderByRule(getOrderByRules().get(i));
-      }
-      unifier.setDescending(isDescending);
-      return unifier;
-    }
-  };
-
-  /**
-   * @return the orderByRules
-   */
-  public ArrayList<OrderByRule<?>> getOrderByRules()
-  {
-    return orderByRules;
-  }
-
-  /**
-   * The order by rules used to order incoming tuples.
-   * @param oredrByRules the orderByRules to set
-   */
-  public void setOrderByRules(ArrayList<OrderByRule<?>> oredrByRules)
-  {
-    this.orderByRules = oredrByRules;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
deleted file mode 100644
index 8573903..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Implements order by key name rule. <br>
- * <p>
- * <b>Properties : </b> <br>
- * <b> columnName : </b> Name of column for ordering tuples. <br>
- * @displayName OrderBy Rule
- * @category Stream Manipulators
- * @tags orderby, sort, comparison
- * @since 0.3.3
- */
-@SuppressWarnings("rawtypes")
-public class OrderByRule<T extends Comparable>
-{
-
-  /**
-   * column name for ordering tuples.
-   */
-  private String columnName;
-
-  public OrderByRule(String name)
-  {
-
-    columnName = name;
-  }
-
-  /**
-   * sort rows by each rule and emit result on output port.
-   */
-  @SuppressWarnings("unchecked")
-  public ArrayList<Map<String, Object>> sort(ArrayList<Map<String, Object>> 
rows)
-  {
-
-    TreeMap<T, ArrayList<Map<String, Object>>> sorted = new TreeMap<T, 
ArrayList<Map<String, Object>>>();
-    for (int i = 0; i < rows.size(); i++) {
-      Map<String, Object> row = rows.get(i);
-      if (row.containsKey(columnName)) {
-        T value = (T)row.get(columnName);
-        ArrayList<Map<String, Object>> list;
-        if (sorted.containsKey(value)) {
-          list = sorted.get(value);
-        } else {
-          list = new ArrayList<Map<String, Object>>();
-          sorted.put(value, list);
-        }
-        list.add(row);
-      }
-    }
-    ArrayList<Map<String, Object>> result = new ArrayList<Map<String, 
Object>>();
-    for (Map.Entry<T, ArrayList<Map<String, Object>>> entry : 
sorted.entrySet()) {
-      result.addAll(entry.getValue());
-    }
-    return result;
-  }
-
-  /**
-   * @return the columnName
-   */
-  public String getColumnName()
-  {
-
-    return columnName;
-  }
-
-  /**
-   * @param columnName
-   *          the columnName to set
-   */
-  public void setColumnName(String columnName)
-  {
-
-    this.columnName = columnName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
deleted file mode 100644
index 0494bfb..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-/**
- * An operator that provides sql left,right and full outer join metric 
semantics on live stream. <br>
- * <p>
- * Please refer to {@link com.datatorrent.lib.streamquery.InnerJoinOperator} 
for
- * details.
- *
- * <b> Properties : </b> <br>
- * <b> isLeftJoin : </b> Left join flag. <br>
- * <b> isFullJoin : </b> Full join flag. <br>
- * @displayName Outer Join
- * @category Stream Manipulators
- * @tags sql, outer join operator
- * @since 0.3.4
- */
-public class OuterJoinOperator extends InnerJoinOperator
-{
-
-  private boolean isLeftJoin = true;
-  private boolean isFullJoin = false;
-
-  @Override
-  public void endWindow()
-  {
-    // full outer join
-    if (isFullJoin) {
-      for (int i = 0; i < table1.size(); i++) {
-        boolean merged = false;
-        for (int j = 0; j < table2.size(); j++) {
-          if ((joinCondition == null)
-              || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
-            merged = true;
-          }
-        }
-        if (!merged) {
-          joinRows(table1.get(i), null);
-        }
-      }
-      for (int i = 0; i < table2.size(); i++) {
-        boolean merged = false;
-        for (int j = 0; j < table1.size(); j++) {
-          if ((joinCondition == null)
-              || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
-            merged = true;
-          }
-        }
-        if (!merged) { // only output non merged rows
-          joinRows(null, table2.get(i));
-        }
-      }
-      return;
-    }
-
-    // left or right join
-    if (isLeftJoin) {
-      for (int i = 0; i < table1.size(); i++) {
-        boolean merged = false;
-        for (int j = 0; j < table2.size(); j++) {
-          if ((joinCondition == null)
-              || (joinCondition.isValidJoin(table1.get(i), table2.get(j)))) {
-            merged = true;
-          }
-        }
-        if (!merged) {
-          joinRows(table1.get(i), null);
-        }
-      }
-    } else {
-      for (int i = 0; i < table2.size(); i++) {
-        boolean merged = false;
-        for (int j = 0; j < table1.size(); j++) {
-          if ((joinCondition == null) || 
(joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
-            merged = true;
-          }
-        }
-        if (!merged) { // only output non merged rows
-          joinRows(null, table2.get(i));
-        }
-      }
-    }
-  }
-
-  public void setLeftJoin()
-  {
-    isLeftJoin = true;
-  }
-
-  public void setRighttJoin()
-  {
-    isLeftJoin = false;
-  }
-
-  public boolean isFullJoin()
-  {
-    return isFullJoin;
-  }
-
-  public void setFullJoin(boolean isFullJoin)
-  {
-    this.isFullJoin = isFullJoin;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
deleted file mode 100644
index 77616f3..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-
-/**
- *  An implementation of Operator that applies sql top or limit semantics on 
incoming tuple(s). <br>
- * <p>
- * <b>StateFull : Yes,</b> Operator aggregates input over application window. 
<br>
- * <b>Partitions : No, </b> will yield wrong result(s). <br>
- * <br>
- * <b>Ports : </b> <br>
- * <b>inport : </b> expect tuple for type T. <br>
- * <b>outport : </b> emits tuple for type T. <br>
- * <br>
- * <b> Properties : </b> <br>
- * <b> functions : </b> Sql function for rows. <br>
- * @displayName Select Function
- * @category Stream Manipulators
- * @tags sql top, sql limit, sql select operator
- * @since 0.3.4
- */
-@OperatorAnnotation(partitionable = false)
-public class SelectFunctionOperator implements Operator
-{
-  /**
-   * array of rows.
-   */
-  private ArrayList<Map<String, Object>> rows;
-
-  /**
-   * Aggregate function for rows.
-   */
-  private ArrayList<FunctionIndex> functions = new ArrayList<FunctionIndex>();
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new 
DefaultInputPort<Map<String, Object>>()
-  {
-
-    @Override
-    public void process(Map<String, Object> row)
-    {
-      rows.add(row);
-    }
-  };
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void teardown()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    rows = new ArrayList<Map<String, Object>>();
-  }
-
-  @Override
-  public void endWindow()
-  {
-    if (functions.size() == 0) {
-      return;
-    }
-    Map<String, Object>  collect = new HashMap<String, Object>();
-    for (FunctionIndex function : functions) {
-      try {
-        function.filter(rows, collect);
-      } catch (Exception e) {
-        e.printStackTrace();
-        return;
-      }
-    }
-    outport.emit(collect);
-  }
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport = new 
DefaultOutputPort<Map<String, Object>>();
-
-  /**
-   * Add sql function.
-   * @param function  Sql function for rows.
-   */
-  public void addSqlFunction(FunctionIndex function)
-  {
-    functions.add(function);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
deleted file mode 100644
index 4dbc1f0..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.index.Index;
-
-/**
- * An implementation of that provides sql select query semantics on live data 
stream. <br>
- * <p>
- * Stream rows passing condition are emitted on output port stream. <br>
- * <br>
- * <b>StateFull : NO,</b> all row data is processed in current time window. 
<br>
- * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for selecting rows. <br>
- * <b> columns : </b> Column names/aggregate functions for select. <br>
- * <br>
- * @displayName Select
- * @category Stream Manipulators
- * @tags sql select operator, index, sql condition
- * @since 0.3.3
- */
-public class SelectOperator extends BaseOperator
-{
-
-  /**
-   * select columns/expression;
-   */
-  private ArrayList<Index> indexes = new ArrayList<Index>();
-
-  /**
-   * condition.
-   */
-  private Condition condition = null;
-
-  /**
-   * add index.
-   */
-  public void addIndex(Index index)
-  {
-    indexes.add(index);
-  }
-
-  /**
-   * set condition.
-   */
-  public void setCondition(Condition condition)
-  {
-    this.condition = condition;
-  }
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new 
DefaultInputPort<Map<String, Object>>()
-  {
-
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      if ((condition != null) && (!condition.isValidRow(tuple))) {
-        return;
-      }
-      if (indexes.size() == 0) {
-        outport.emit(tuple);
-        return;
-      }
-      Map<String, Object> result = new HashMap<String, Object>();
-      for (int i = 0; i < indexes.size(); i++) {
-        indexes.get(i).filter(tuple, result);
-      }
-      outport.emit(result);
-    }
-  };
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport = new 
DefaultOutputPort<Map<String, Object>>();
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
deleted file mode 100644
index c3ae083..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-
-/**
- * An implementation of Operator that provides sql top select query semantic 
on live data stream. <br>
- * <p>
- * Stream rows passing condition are emitted on output port stream. <br>
- * <br>
- * <b>StateFull : NO,</b> all row data is processed in current time window. 
<br>
- * <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
- * <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects
- * HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits
- * HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> topValue : </b> top values count. <br>
- * <b> isPercentage : </b> top values count is percentage flag.
- * <br>
- * @displayName Select Top
- * @category Stream Manipulators
- * @tags sql select, sql top operator
- *  @since 0.3.4
- */
-public class SelectTopOperator implements Operator
-{
-  private ArrayList<Map<String, Object>> list;
-  private int topValue = 1;
-  private boolean isPercentage = false;
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new 
DefaultInputPort<Map<String, Object>>()
-  {
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      list.add(tuple);
-    }
-  };
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void teardown()
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    list = new ArrayList<Map<String, Object>>();
-  }
-
-  @Override
-  public void endWindow()
-  {
-    int numEmits = topValue;
-    if (isPercentage) {
-      numEmits = list.size() * (topValue / 100);
-    }
-    for (int i = 0; (i < numEmits) && (i < list.size()); i++) {
-      outport.emit(list.get(i));
-    }
-  }
-
-  public int getTopValue()
-  {
-    return topValue;
-  }
-
-  public void setTopValue(int topValue) throws Exception
-  {
-    if (topValue <= 0) {
-      throw new Exception("Top value must be positive number.");
-    }
-    this.topValue = topValue;
-  }
-
-  public boolean isPercentage()
-  {
-    return isPercentage;
-  }
-
-  public void setPercentage(boolean isPercentage)
-  {
-    this.isPercentage = isPercentage;
-  }
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport =  new 
DefaultOutputPort<Map<String, Object>>();
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
deleted file mode 100644
index 6724a7e..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.streamquery.condition.Condition;
-
-/**
- *  An implementation of BaseOperator that provides sql update query semantic 
on live data stream. <br>
- *  <p>
- *  Stream rows passing condition are emitted on output port stream. <br>
- *  <br>
- *  <b>StateFull : NO,</b> all row data is processed in current time window. 
<br>
- *  <b>Partitions : Yes, </b> No Input dependency among input rows. <br>
- *  <br>
- * <b>Ports</b>:<br>
- * <b> inport : </b> Input hash map(row) port, expects 
HashMap&lt;String,Object&gt;<<br>
- * <b> outport : </b> Output hash map(row) port, emits  
HashMap&lt;String,Object&gt;<br>
- * <br>
- * <b> Properties : <b> <br>
- * <b> condition : </b> Select condition for selecting rows. <br>
- * <b> columns : </b> Column names/aggregate functions for select. <br>
- * <br>
- * @displayName Update
- * @category Stream Manipulators
- * @tags sql update operator, sql condition
- * @since 0.3.3
- */
-public class UpdateOperator extends BaseOperator
-{
-  /**
-   * Update value map.
-   */
-  Map<String, Object> updates = new HashMap<String, Object>();
-
-  /**
-   *  condition.
-   */
-  private Condition condition = null;
-
-  /**
-   * set condition.
-   */
-  public void setCondition(Condition condition)
-  {
-    this.condition = condition;
-  }
-
-  /**
-   * Input port that takes a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new 
DefaultInputPort<Map<String, Object>>()
-  {
-    @Override
-    public void process(Map<String, Object> tuple)
-    {
-      if ((condition != null) && (!condition.isValidRow(tuple))) {
-        return;
-      }
-      if (updates.size() == 0) {
-        outport.emit(tuple);
-        return;
-      }
-      Map<String, Object> result = new HashMap<String, Object>();
-      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
-        if (updates.containsKey(entry.getKey())) {
-          result.put(entry.getKey(), updates.get(entry.getKey()));
-        } else {
-          result.put(entry.getKey(), entry.getValue());
-        }
-      }
-      outport.emit(result);
-    }
-  };
-
-  /**
-   * Output port that emits a map of &lt;string,object&gt;.
-   */
-  public final transient DefaultOutputPort<Map<String, Object>> outport =  new 
DefaultOutputPort<Map<String, Object>>();
-
-  /**
-   * Add update value.
-   */
-  public void addUpdate(String name, Object value)
-  {
-    updates.put(name, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
deleted file mode 100644
index 43cdc72..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- *  A derivation of Condition that validates row by checking if the given 
column name value lies between given left,right range. <br>
- * <p>
- * <b>Properties : </b> <br>
- * <b> column : </b> Name of column. <br>
- * <b> leftValue : </b> left range of column value. <br>
- * <b> rightValue : </b> right range od column value. <br>
- * <br>
- * @displayName Between Condition
- * @category Stream Manipulators
- * @tags sql condition
- * @since 0.3.4
- */
-public class BetweenCondition  extends Condition
-{
-  /**
-   * Column name to be checked.
-   */
-  @NotNull
-  private String column;
-
-  /**
-   * Left range value.
-   */
-  @NotNull
-  private Object leftValue;
-
-  /**
-   * Right range value.
-   */
-  @NotNull
-  private Object rightValue;
-
-  /**
-   * @param  column  Name of column, must be non null. <br>
-   * @param  leftValue  Left range for value, mut be non null. <br>
-   * @param  rightValue  right range for value, mut be non null. <br>
-   */
-  public BetweenCondition(@NotNull String column, @NotNull  Object leftValue, 
@NotNull Object rightValue)
-  {
-    this.column = column;
-    this.leftValue = leftValue;
-    this.rightValue = rightValue;
-  }
-
-  /**
-   * Validate given row.
-   */
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  @Override
-  public boolean isValidRow(@NotNull Map<String, Object> row)
-  {
-    if (!row.containsKey(column)) {
-      return false;
-    }
-    Object value = row.get(column);
-    if (value == null) {
-      return false;
-    }
-    if (((Comparable)value).compareTo((Comparable)leftValue) < 0) {
-      return false;
-    }
-    if (((Comparable)value).compareTo((Comparable)rightValue) > 0) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Must not be called.
-   */
-  @Override
-  public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, 
Object> row2)
-  {
-    assert (false);
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
deleted file mode 100644
index b4bd3ed..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * A derivation of Condition index that implements logical AND/OR select 
expression. <br>
- * <p>
- * Class provides logical OR or AND function specified in parameters. User can 
implement
- * complex and/or expression by chaining operator itself.
- * <br>
- * <b> Properties : </b> <br>
- * <b> leftCondition : </b> Left validate row condition . <br>
- * <b> rightCondition : </b> Right validate row condition. <br>
- * <b> logicalOr : </b> OR/AND logical metric flag. <br>
- * <br>
- * @displayName Compound Condition
- * @category Stream Manipulators
- * @tags sql condition, logical
- * @since 0.3.4
- */
-public class CompoundCondition extends Condition
-{
-  /**
-   * Left validate row condition .
-   */
-  @NotNull
-  private Condition leftCondition;
-
-  /**
-   * Right validate row condition .
-   */
-  @NotNull
-  private Condition rightCondition;
-
-  /**
-   * AND/OR metric flag.
-   */
-  private boolean logicalOr = true;
-
-  /**
-   * Constructor for logical or metric.
-   *
-   * @param leftCondition  Left validate row condition, must be non null. <br>
-   * @param rightCondition Right validate row condition, must be non null. <br>
-   */
-  public CompoundCondition(Condition leftCondition, Condition rightCondition)
-  {
-    this.leftCondition = leftCondition;
-    this.rightCondition = rightCondition;
-  }
-
-  /**
-   * Constructor for logical and metric if logical and parameter is true.
-   * <br>
-   *
-   * @param leftCondition  Left validate row condition, must be non null. <br>
-   * @param rightCondition Right validate row condition, must be non null. <br>
-   * @param isLogicalAnd   Logical AND if true.
-   */
-  public CompoundCondition(Condition leftCondition, Condition rightCondition, 
boolean isLogicalAnd)
-  {
-    this.leftCondition = leftCondition;
-    this.rightCondition = rightCondition;
-    logicalOr = !isLogicalAnd;
-  }
-
-  @Override
-  public boolean isValidRow(Map<String, Object> row)
-  {
-    if (logicalOr) {
-      return leftCondition.isValidRow(row) || rightCondition.isValidRow(row);
-    } else {
-      return leftCondition.isValidRow(row) && rightCondition.isValidRow(row);
-    }
-  }
-
-  @Override
-  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> 
row2)
-  {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  public Condition getLeftCondition()
-  {
-    return leftCondition;
-  }
-
-  public void setLeftCondition(Condition leftCondition)
-  {
-    this.leftCondition = leftCondition;
-  }
-
-  public Condition getRightCondition()
-  {
-    return rightCondition;
-  }
-
-  public void setRightCondition(Condition rightCondition)
-  {
-    this.rightCondition = rightCondition;
-  }
-
-  public void setLogicalAnd()
-  {
-    this.logicalOr = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
deleted file mode 100644
index bb478cf..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * An implementation of condition on column equality.
- * <p>
- * A valid row must have all key/value map in column name/value map.
- *
- * <b> Properties : </b> <br>
- *  <b> equalMap : </b> Column equal value map store.
- * @displayName Equal Value Condition
- * @category Stream Manipulators
- * @tags sql condition
- * @since 0.3.4
- */
-public class EqualValueCondition extends Condition
-{
-
-  /**
-   * Equal column map.
-   */
-  private HashMap<String, Object> equalMap = new HashMap<String, Object>();
-
-  /**
-   * Add column equal condition.
-   */
-  public void addEqualValue(String column, Object value)
-  {
-    equalMap.put(column, value);
-  }
-
-  /**
-   * Check valid row.
-   */
-  @Override
-  public boolean isValidRow(Map<String, Object> row)
-  {
-    // no conditions
-    if (equalMap.size() == 0) {
-      return true;
-    }
-
-    // compare each condition value
-    for (Map.Entry<String, Object> entry : equalMap.entrySet()) {
-      if (!row.containsKey(entry.getKey())) {
-        return false;
-      }
-      Object value = row.get(entry.getKey());
-      if (entry.getValue() == null) {
-        if (value == null) {
-          return true;
-        }
-        return false;
-      }
-      if (value == null) {
-        return false;
-      }
-      if (!entry.getValue().equals(value)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * check valid join, not implemented
-   *
-   * @return false
-   */
-  @Override
-  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> 
row2)
-  {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
deleted file mode 100644
index 7877053..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-
-/**
- *  A derivation of HavingCondition that implements comparison of aggregate 
index value to input compare value. <br>
- * <p>
- * Compare value must implement interface Comparable. <br>
- * <br>
- * <b> Properties : </b>
- *  <b> compareValue : </b>  Value to be compared. <br>
- *  <b>  compareType : </b> Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
- * @displayName Having Compare Value
- * @category Stream Manipulators
- * @tags compare, sql condition
- * @since 0.3.4
- */
-@SuppressWarnings("rawtypes")
-public class HavingCompareValue<T extends Comparable>   extends HavingCondition
-{
-  /**
-   * Value to be compared.
-   */
-  private T compareValue;
-
-  /**
-   * Type of comparison -1 == lt, 0 == eq, 1 == gt.
-   */
-  private int compareType;
-
-  /**
-   * @param aggregateIndex   aggregate index for comparison. <br>
-   * @param compareValue     Value to be compared. <br>
-   * @param compareType    Type of comparison -1 == lt, 0 == eq, 1 == gt. <br>
-   */
-  public HavingCompareValue(FunctionIndex aggregateIndex, T compareValue, int 
compareType)
-  {
-    super(aggregateIndex);
-    this.compareValue = compareValue;
-    this.compareType  = compareType;
-  }
-
-  /**
-   * Validate aggregate override. <br>
-   */
-  @SuppressWarnings("unchecked")
-  @Override
-  public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> 
rows) throws Exception
-  {
-    Object computed = aggregateIndex.compute(rows);
-    return (compareType == compareValue.compareTo(computed));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
deleted file mode 100644
index 6dac690..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-
-/**
- *  A base class for Group,Having operator with aggregate index 
constraint.&nsbsp; Subclasses should provide the
-    implementation to check if aggregate is valid.
- * <p>
- * @displayName Having Condition
- * @category Stream Manipulators
- * @tags sql condition, index, group
- * @since 0.3.4
- */
-public abstract class HavingCondition
-{
-  /**
-   * Aggregate index to be validated.
-   */
-  protected FunctionIndex  aggregateIndex = null;
-
-  /**
-   * @param aggregateIndex Aggregate index to be validated.
-   */
-  public HavingCondition(FunctionIndex aggregateIndex)
-  {
-    this.aggregateIndex = aggregateIndex;
-  }
-
-  /**
-   *  Check if aggregate is valid.
-   */
-  public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, 
Object>> rows) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
deleted file mode 100644
index 236f3b1..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * An implementation of condition class to check if a column value is in a 
given set of values.
- * <p>
- * <br>
- * <b>Properties : </b> <br>
- * <b> column : </b> Column name for which value is checked in values set. <br>
- * <b> inValues : </b> Set of values in which column value is checked. <br>
- * @displayName In Condition
- * @category Stream Manipulators
- * @tags sql condition
- * @since 0.3.4
- */
-public class InCondition extends Condition
-{
-  /**
-   * Column name for which value is checked in values set.
-   */
-  @NotNull
-  private String column;
-
-  /**
-   * Set of values in which column value is checked.
-   */
-  private Set<Object> inValues = new HashSet<Object>();
-
-  /**
-   * @param column Column name for which value is checked in values set.
-   */
-  public InCondition(@NotNull String column)
-  {
-    this.column = column;
-  }
-
-  @Override
-  public boolean isValidRow(@NotNull Map<String, Object> row)
-  {
-    if (!row.containsKey(column)) {
-      return false;
-    }
-    return inValues.contains(row.get(column));
-  }
-
-  @Override
-  public boolean isValidJoin(@NotNull Map<String, Object> row1, @NotNull 
Map<String, Object> row2)
-  {
-    return false;
-  }
-
-  public String getColumn()
-  {
-    return column;
-  }
-
-  public void setColumn(String column)
-  {
-    this.column = column;
-  }
-
-  public void addInValue(Object value)
-  {
-    this.inValues.add(value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
index d350edc..4d8199e 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
-
 /**
  * An implementation of equal join condition class.
  * <p>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
deleted file mode 100644
index b3d7174..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.condition;
-
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.validation.constraints.NotNull;
-
-
-/**
- * An implementation of condition class to filter rows for which given column 
name value matches given regular expression. <br>
- *<p>
- *<b> Properties : </b> <br>
- *<b> column : < /b> Column to be matched with regular expression. <br>
- *<b> pattern : </b> Regular expression pattern.<br>
- * @displayName Like Condition
- * @category Stream Manipulators
- * @tags sql, like condition, regular expression
- * @since 0.3.4
- */
-public class LikeCondition extends Condition
-{
-  /**
-   * Column to be matched with regular expression.
-   */
-  @NotNull
-  private String column;
-
-  /**
-   * Regular expression pattern.
-   */
-  @NotNull
-  private Pattern pattern;
-
-  /**
-   * @param column Column to be matched with regular expression, must be 
non-null.
-   * @param pattern Regular expression pattern, must be non-null.
-   */
-  public LikeCondition(@NotNull String column,@NotNull String pattern)
-  {
-    setColumn(column);
-    setPattern(pattern);
-  }
-
-  /**
-   * For valid row column value string must match regular expression.
-   * @return row valid status.
-   */
-  @Override
-  public boolean isValidRow(Map<String, Object> row)
-  {
-    if (!row.containsKey(column)) {
-      return false;
-    }
-    Matcher match = pattern.matcher((CharSequence)row.get(column));
-    return match.find();
-  }
-
-  /**
-   * Must not be called.
-   */
-  @Override
-  public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> 
row2)
-  {
-    assert (false);
-    return false;
-  }
-
-  public String getColumn()
-  {
-    return column;
-  }
-
-  public void setColumn(String column)
-  {
-    this.column = column;
-  }
-
-  public void setPattern(String pattern)
-  {
-    this.pattern = Pattern.compile(pattern);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
deleted file mode 100644
index e212ff8..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * An implementation of function index that implements average function 
semantics. <br>
- * <p>
- *   e.g : sql => SELECT AVG(column_name) FROM table_name. <br>
- *   <br>
- *   <b> Properties : </b> <br>
- *   <b> column : </b> Aggregate over given column values.   <br>
- *   <b> alias  : </b> Alias name for aggregate output. <br>
- * @displayName Average Function
- * @category Stream Manipulators
- * @tags sql average
- * @since 0.3.4
- */
-public class AverageFunction  extends FunctionIndex
-{
-  /**
-   * @param column Aggregate over given column values, must be non null.
-   * @param alias  Alias name for aggregate output.
-   */
-  public AverageFunction(@NotNull String column, String alias)
-  {
-    super(column, alias);
-  }
-
-  /**
-   * Compute average for given column values.
-   */
-  @Override
-  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
-  {
-    if (rows.size() == 0) {
-      return 0.0;
-    }
-    double sum = 0.0;
-    for (Map<String, Object> row : rows) {
-      sum += ((Number)row.get(column)).doubleValue();
-    }
-    return sum / rows.size();
-  }
-
-  /**
-   * Get aggregate name.
-   * @return name.
-   */
-  @Override
-  protected String aggregateName()
-  {
-    if (!StringUtils.isEmpty(alias)) {
-      return alias;
-    }
-    return "AVG(" + column + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
deleted file mode 100644
index dafe54e..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * An implementation of function index that implements sql count function 
semantic. <br>
- * <p>
- * Counts number of values of given column and returns count of non null 
values in column.
- *   e.g : sql => SELECT COUNT(column_name) FROM table_name. <br>
- *   <br>
- *   <b> Properties : </b> <br>
- *   <b> column : </b> column name for values count.   <br>
- *   <b> alias  : </b> Alias name for aggregate output. <br>
- * @displayName Count Function
- * @category Stream Manipulators
- * @tags sql count
- * @since 0.3.4
- */
-public class CountFunction extends FunctionIndex
-{
-  /**
-   * @param column column for values count, must be non null.
-   * @param alias  Alias name for aggregate output.
-   */
-  public CountFunction(@NotNull String column, String alias)
-  {
-    super(column, alias);
-  }
-
-  /**
-   * Count number of values of given column.
-   * @return Count of non null values in column.
-   */
-  @Override
-  public Object compute(ArrayList<Map<String, Object>> rows) throws Exception
-  {
-    if (column.equals("*")) {
-      return rows.size();
-    }
-    long count = 0;
-    for (Map<String, Object> row : rows) {
-      if (row.containsKey(column) && (row.get(column) != null)) {
-        count++;
-      }
-    }
-    return count;
-  }
-
-  /**
-   * Aggregate output name.
-   * @return name string.
-   */
-  @Override
-  protected String aggregateName()
-  {
-    if (!StringUtils.isEmpty(alias)) {
-      return alias;
-    }
-    return "COUNT(" + column + ")";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
deleted file mode 100644
index db2c2b7..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * An implementation of function index that implements sql first,last function 
semantic. <br>
- * <p>
- *   e.g : sql => SELECT FIRST/LAST(column_name) FROM table_name. <br>
- *   <br>
- *   <b> Properties : </b> <br>
- *   <b> column : </b> column name for first/last value.   <br>
- *   <b> alias  : </b> Alias name for output. <br>
- *   <b> isFirst : </b> return first value if true.
- * @displayName First Last Function
- * @category Stream Manipulators
- * @tags sql first, sql last
- * @since 0.3.4
- */
-public class FirstLastFunction extends FunctionIndex
-{
-  /**
-   * return first value if true.
-   */
-  private boolean isFirst;
-
-  /**
-   * @param column  column name for first/last value.
-   * @param  alias   Alias name for output.
-   * @param  isFirst return first value if true.
-   */
-  public FirstLastFunction(@NotNull String column, String alias, boolean 
isLast)
-  {
-    super(column, alias);
-    isFirst = !isLast;
-  }
-
-  /**
-   * Get first/last non null value for column.
-   */
-  @Override
-  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
-  {
-    if (rows.size() == 0) {
-      return null;
-    }
-    if (isFirst) {
-      for (int i = 0; i < rows.size(); i++) {
-        if (rows.get(i).get(column) != null) {
-          return rows.get(i).get(column);
-        }
-      }
-    } else {
-      for (int i = (rows.size() - 1); i >= 0; i--) {
-        if (rows.get(i).get(column) != null) {
-          return rows.get(i).get(column);
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Aggregate output name.
-   * @return name string.
-   */
-  @Override
-  protected String aggregateName()
-  {
-    if (!StringUtils.isEmpty(alias)) {
-      return alias;
-    }
-    if (isFirst) {
-      return "FIRST(" + column + ")";
-    }
-    return "LAST(" + column + ")";
-  }
-
-  public boolean isFirst()
-  {
-    return isFirst;
-  }
-
-  public void setFirst(boolean isFirst)
-  {
-    this.isFirst = isFirst;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
deleted file mode 100644
index 918ca89..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * A base class for select aggregate function implementation.&nbsp; Subclasses 
should provide the
-   implementation for aggregate compute functions.
- * <p>
- * <br>
- * <b>Properties : </b> <br>
- * <b>column : </b> Column name for aggregation.
- * <b>alias : </b> Output value alias name.
- * @displayName Function Index
- * @category Stream Manipulators
- * @tags sql aggregate
- * @since 0.3.4
- */
-public abstract class FunctionIndex
-{
-  /**
-   * Column name.
-   */
-  @NotNull
-  protected String column;
-
-  /**
-   * Alias name.
-   */
-  protected String alias;
-
-  /**
-   * @param column Column name for aggregation.
-   * @param alias Output value alias name.
-   */
-  public FunctionIndex(@NotNull String column, String alias)
-  {
-    this.column = column;
-    this.alias = alias;
-  }
-
-  /**
-   * Aggregate compute function, implementation in sub class.
-   * @param rows Tuple list over application window.
-   * @return aggregate result object.
-   */
-  public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) 
throws Exception;
-
-  /**
-   * Get aggregate output value name.
-   * @return name string.
-   */
-  protected abstract String aggregateName();
-
-  /**
-   * Apply compute function to given rows and store result in collect by 
output value name.
-   * @param  rows Tuple list over application window.
-   */
-  public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> 
collect) throws Exception
-  {
-    if (rows == null) {
-      return;
-    }
-    String name = column;
-    if (alias != null) {
-      name = alias;
-    }
-    if (name == null) {
-      name = aggregateName();
-    }
-    collect.put(name, compute(rows));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
deleted file mode 100644
index f02e82c..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * An implementation of function index that implements sql max and sql min 
function semantic. <br>
- * <p>
- *   e.g : sql => SELECT MAX/MIN(column_name) FROM table_name. <br>
- *   <br>
- *   <b> Properties : </b> <br>
- *   <b> column : </b> column name for values max/min computation.   <br>
- *   <b> alias  : </b> Alias name for  output value. <br>
- *   <b> isMax : </b> Flag to indicate max/min compute value. <br>
- * @displayName Max Min Function
- * @category Stream Manipulators
- * @tags sql max, sql min
- * @since 0.3.4
- */
-public class MaxMinFunction extends FunctionIndex
-{
-  /**
-   * Flag to indicate max/min compute value, compute max if true.
-   */
-  private boolean isMax = true;
-
-  /**
-   * @param column column name for values max/min computation.   <br>
-   * @param alias  Alias name for output. <br>
-   * @param isMax  Flag to indicate max/min compute value. <br>
-   */
-  public MaxMinFunction(@NotNull String column, String alias, boolean isMin)
-  {
-    super(column, alias);
-    isMax = !isMin;
-  }
-
-  /**
-   * Compute max/min for given column.
-   * @return max/min value.
-   */
-  @Override
-  public Object compute(ArrayList<Map<String, Object>> rows) throws Exception
-  {
-    double minMax = 0.0;
-    for (Map<String, Object> row : rows) {
-      double value = ((Number)row.get(column)).doubleValue();
-      if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) {
-        minMax = value;
-      }
-    }
-    return minMax;
-  }
-
-  /**
-   * Aggregate output name.
-   * @return name string.
-   */
-  @Override
-  protected String aggregateName()
-  {
-    if (!StringUtils.isEmpty(alias)) {
-      return alias;
-    }
-    if (isMax) {
-      return "MAX(" + column + ")";
-    }
-    return "MIN(" + column + ")";
-  }
-
-  public boolean isMax()
-  {
-    return isMax;
-  }
-
-  public void setMax(boolean isMax)
-  {
-    this.isMax = isMax;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
deleted file mode 100644
index 02186cd..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.function;
-
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-
-
-/**
- * <p> An implementation of sql sum function. </p>
- * <p>
- * @displayName Sum Function
- * @category Stream Manipulators
- * @tags sql sum, aggregate
- * @since 0.3.4
- */
-public class SumFunction extends FunctionIndex
-{
-  public SumFunction(String column, String alias) throws Exception
-  {
-    super(column, alias);
-  }
-
-  @Override
-  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
-  {
-    Double result = 0.0;
-    for (Map<String, Object> row : rows) {
-      if (!row.containsKey(column)) {
-        continue;
-      }
-      result += ((Number)row.get(column)).doubleValue();
-    }
-    return result;
-  }
-
-  @Override
-  protected String aggregateName()
-  {
-    return "Sum(" + column;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
deleted file mode 100644
index 21c1d11..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import javax.validation.constraints.NotNull;
-
-
-/**
- * Abstract class to filter row by binary expression index.
- * <p>
- * Sub class will implement filter/getExpressionName functions.
- * @displayName Binary Expression
- * @category Stream Manipulators
- * @tags alias
- * @since 0.3.4
- */
-public abstract class BinaryExpression  implements Index
-{
-  /**
-   * Left column name argument for expression.
-   */
-  @NotNull
-  protected String left;
-
-  /**
-   * Right column name argument for expression.
-   */
-  @NotNull
-  protected String right;
-
-  /**
-   *  Alias name for output field.
-   */
-  protected String alias;
-
-  /**
-   * @param left column name argument for expression.
-   * @param right column name argument for expression.
-   * @param alias name for output field.
-   */
-  public BinaryExpression(@NotNull String left, @NotNull String right, String 
alias)
-  {
-    this.left = left;
-    this.right = right;
-  }
-
-  public String getAlias()
-  {
-    return alias;
-  }
-
-  public void setAlias(String alias)
-  {
-    this.alias = alias;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
deleted file mode 100644
index 931ddaa..0000000
--- a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * <p>An implementation of Column Index that implements filter method based on 
mid index. </p>
- * <p>
- * @displayName Mid Index
- * @category Stream Manipulators
- * @tags index
- * @since 0.3.4
- */
-public class MidIndex extends ColumnIndex
-{
-  private int start;
-  private int length = 0;
-
-  public MidIndex(@NotNull String column, String alias, int start)
-  {
-    super(column, alias);
-    assert (start >= 0);
-    this.start = start;
-  }
-
-  @Override
-  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
-  {
-    if (!row.containsKey(column)) {
-      return;
-    }
-    if (!(row.get(column) instanceof String)) {
-      assert (false);
-    }
-    String name = getColumn();
-    if (alias != null) {
-      name = alias;
-    }
-
-    int endIndex = start + length;
-    if ((length == 0) || (endIndex > ((String)row.get(column)).length())) {
-      collect.put(name, row.get(column));
-    } else {
-      collect.put(name, ((String)row.get(column)).substring(start, endIndex));
-    }
-  }
-
-  public int getLength()
-  {
-    return length;
-  }
-
-  public void setLength(int length)
-  {
-    assert (length > 0);
-    this.length = length;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
deleted file mode 100644
index 969e3af..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.Null;
-
-
-/**
- * An implementation of Unary Expression that implements filter method using 
negate metric sql semantic on column value.
- * <p>
- * @displayName Negate Expression
- * @category Stream Manipulators
- * @tags expression, alias
- * @since 0.3.4
- */
-public class NegateExpression extends UnaryExpression
-{
-
-  /**
-   * @param column   Name of column value to be negated.
-   */
-  public NegateExpression(@Null String column, String alias)
-  {
-    super(column, alias);
-    if (this.alias == null) {
-      this.alias = "NEGATE(" + column + ")";
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, 
java.util.Map)
-   */
-  @Override
-  public void filter(Map<String, Object> row, Map<String, Object> collect)
-  {
-    if (!row.containsKey(column)) {
-      return;
-    }
-    collect.put(alias, -((Number)row.get(column)).doubleValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
deleted file mode 100644
index 90e16a1..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * <p>An implementation of column index that implements filter method using 
Round Double Index. </p>
- *
- * @displayName Round Double Index
- * @category Stream Manipulators
- * @tags alias, maths
- * @since 0.3.4
- */
-public class RoundDoubleIndex  extends ColumnIndex
-{
-  private int rounder;
-  public RoundDoubleIndex(@NotNull String column, String alias, int 
numDecimals)
-  {
-    super(column, alias);
-    rounder = 1;
-    if (numDecimals > 0) {
-      rounder = (int)Math.pow(10, numDecimals);
-    }
-  }
-
-  @Override
-  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
-  {
-    if (!row.containsKey(column)) {
-      return;
-    }
-    double value = (Double)row.get(column);
-    value = Math.round(value * rounder) / rounder;
-    String name = getColumn();
-    if (alias != null) {
-      name = alias;
-    }
-    collect.put(name, value);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
deleted file mode 100644
index 2c49a79..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * <p>An implementation of Column Index that implements filter method using 
case of a string index. </p>
- *
- * @displayName String Case Index
- * @category Stream Manipulators
- * @tags alias
- * @since 0.3.4
- */
-public class StringCaseIndex extends  ColumnIndex
-{
-  private boolean toUpperCase = true;
-  public StringCaseIndex(@NotNull String column, String alias, boolean 
toLowerCase)
-  {
-    super(column, alias);
-    toUpperCase = !toLowerCase;
-  }
-
-  @Override
-  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
-  {
-    if (!row.containsKey(column)) {
-      return;
-    }
-    if (!(row.get(column) instanceof String)) {
-      assert (false);
-    }
-
-    String name = getColumn();
-    if (alias != null) {
-      name = alias;
-    }
-    if (toUpperCase) {
-      collect.put(name, ((String)row.get(column)).toUpperCase());
-    } else {
-      collect.put(name, ((String)row.get(column)).toLowerCase());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
deleted file mode 100644
index 4dbfee1..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-/**
- * <p>An implementation of Column Index that implements filter method using 
length of a string Index. </p>
- * <p>
- * @displayName String Length Index
- * @category Stream Manipulators
- * @tags alias
- * @since 0.3.4
- */
-public class StringLenIndex  extends ColumnIndex
-{
-  public StringLenIndex(@NotNull String column, String alias)
-  {
-    super(column, alias);
-  }
-
-  @Override
-  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
-  {
-    if (!row.containsKey(column)) {
-      return;
-    }
-    if (!(row.get(column) instanceof String)) {
-      assert (false);
-    }
-
-    String name = getColumn();
-    if (alias != null) {
-      name = alias;
-    }
-    collect.put(name, ((String)row.get(column)).length());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
deleted file mode 100644
index a0144da..0000000
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.index;
-
-import java.util.Map;
-
-import javax.validation.constraints.NotNull;
-
-
-/**
- * Implements sum on column index.
- * <p>
- * Select index class for implementing sum column index.
- * @displayName Sum Expression
- * @category Stream Manipulators
- * @tags sum
- * @since 0.3.4
- */
-public class SumExpression extends BinaryExpression
-{
-
-  /**
-   * @param left column name argument for expression.
-   * @param right column name argument for expression.
-   * @param alias name for output field.
-   */
-  public SumExpression(@NotNull String left, @NotNull String right, String 
alias)
-  {
-    super(left, right, alias);
-    if (this.alias == null) {
-      this.alias = "SUM(" + left + "," + right + ")";
-    }
-  }
-
-  /* sum column values.
-   * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, 
java.util.Map)
-   */
-  @Override
-  public void filter(Map<String, Object> row, Map<String, Object> collect)
-  {
-    if (!row.containsKey(left) || !row.containsKey(right)) {
-      return;
-    }
-    collect.put(alias, ((Number)row.get(left)).doubleValue() + 
((Number)row.get(right)).doubleValue());
-  }
-
-}

Reply via email to