http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
new file mode 100755
index 0000000..8209445
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.EntityQualifierUtils;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.query.parser.*;
+import org.apache.hadoop.hbase.filter.*;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * the steps of building hbase filters
+ * 1. receive ORExpression from eagle-antlr
+ * 2. iterate all ANDExpression in ORExpression
+ *    2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option
+ *    2.2 iterate all AtomicExpression in ANDExpression
+ *       2.2.1 group AtomicExpression into 2 groups by looking up metadata, 
one is for tag filters, the other is for column filters
+ *       2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL 
option
+ */
+public class HBaseFilterBuilder {
+       private static final Logger LOG = 
LoggerFactory.getLogger(HBaseFilterBuilder.class);
+       
+       /**
+        * syntax is @<fieldname>
+        */
+//     private static final String fnRegex = "^@(.*)$";
+       private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// 
Pattern.compile(fnRegex);
+       private static final Charset _defaultCharset = 
Charset.forName("ISO-8859-1");
+
+       private ORExpression _orExpr;
+       private EntityDefinition _ed;
+       private boolean _filterIfMissing;
+       private Charset _charset = _defaultCharset;
+
+       /**
+        * TODO: Verify performance impact
+        *
+        * @return
+        */
+       public Set<String> getFilterFields() {
+               return _filterFields;
+       }
+
+       /**
+        * Just add filter fields for expression filter
+        */
+       private Set<String> _filterFields;
+
+       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
+               this(ed, orExpr, false);
+       }
+       
+       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, 
boolean filterIfMissing) {
+               this._ed = ed;
+               this._orExpr = orExpr;
+               this._filterIfMissing = filterIfMissing;
+       }
+       
+       public void setCharset(String charsetName){
+               _charset = Charset.forName(charsetName);
+       }
+       
+       public Charset getCharset(){
+               return _charset;
+       }
+       
+       /**
+        * Because we don't have metadata for tag, we regard non-qualifer field 
as tag. So one field possibly is not a real tag when this function return true. 
This happens
+        * when a user input an wrong field name which is neither tag or 
qualifier
+        *   
+        * @param field
+        */
+       private boolean isTag(String field){
+               return _ed.isTag(field);
+       }
+       
+       /**
+        * check whether this field is one entity attribute or not 
+        * @param fieldName
+        * @return
+        */
+       private String parseEntityAttribute(String fieldName){
+               Matcher m = _fnPattern.matcher(fieldName);
+               if(m.find()){
+                       return m.group(1);
+               }
+               return null;
+       }
+
+       /**
+        * Return the partition values for each or expression. The size of the 
returned list should be equal to
+        * the size of FilterList that {@link #buildFilters()} returns.
+        * 
+        * TODO: For now we don't support one query to query multiple 
partitions. In future if partition is defined, 
+        * for the entity, internally We need to spawn multiple queries and 
send one query for each partition.
+        * 
+        * @return Return the partition values for each or expression. Return 
null if the entity doesn't support
+        * partition
+        */
+       public List<String[]> getPartitionValues() {
+               final String[] partitions = _ed.getPartitions();
+               if (partitions == null || partitions.length == 0) {
+                       return null;
+               }
+               final List<String[]> result = new ArrayList<String[]>();
+               final Map<String, String> partitionKeyValueMap = new 
HashMap<String, String>();
+               for(ANDExpression andExpr : _orExpr.getANDExprList()) {
+                       partitionKeyValueMap.clear();
+                       for(AtomicExpression ae : andExpr.getAtomicExprList()) {
+                               // TODO temporarily ignore those fields which 
are not for attributes
+                               if(ae.getKeyType() == TokenType.ID) {
+                                       final String fieldName = 
parseEntityAttribute(ae.getKey());
+                                       if (fieldName == null) {
+                                               LOG.warn(fieldName + " field 
does not have format @<FieldName>, ignored");
+                                               continue;
+                                       }
+                                       if (_ed.isPartitionTag(fieldName) && 
ComparisonOperator.EQUAL.equals(ae.getOp())) {
+                                               final String value = 
ae.getValue();
+                                               
partitionKeyValueMap.put(fieldName, value);
+                                       }
+                               }
+                       }
+                       final String[] values = new String[partitions.length];
+                       result.add(values);
+                       for (int i = 0; i < partitions.length; ++i) {
+                               final String partition = partitions[i];
+                               final String value = 
partitionKeyValueMap.get(partition);
+                               values[i] = value;
+                       }
+               }
+               return result;
+       }
+
+       /**
+        * @see org.apache.eagle.query.parser.TokenType
+        *
+        * @return
+        */
+       public FilterList buildFilters(){
+               // TODO: Optimize to select between row filter or column filter 
for better performance
+               // Use row key filter priority by default
+               boolean rowFilterPriority = true;
+
+               FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
+               for(ANDExpression andExpr : _orExpr.getANDExprList()){
+                       
+                       FilterList list = new 
FilterList(Operator.MUST_PASS_ALL);
+                       Map<String, List<String>> tagFilters = new 
HashMap<String, List<String>>();
+                       List<QualifierFilterEntity> qualifierFilters = new 
ArrayList<QualifierFilterEntity>();
+//                     List<QualifierFilterEntry> tagLikeQualifierFilters = 
new ArrayList<QualifierFilterEntry>();
+
+                       // TODO refactor not to use too much if/else
+                       for(AtomicExpression ae : andExpr.getAtomicExprList()){
+                               // TODO temporarily ignore those fields which 
are not for attributes
+
+                               String fieldName = ae.getKey();
+                               if(ae.getKeyType() == TokenType.ID){
+                                       fieldName = 
parseEntityAttribute(fieldName);
+                                       if(fieldName == null){
+                                               LOG.warn(fieldName + " field 
does not have format @<FieldName>, ignored");
+                                               continue;
+                                       }
+                               }
+
+                               String value = ae.getValue();
+                               ComparisonOperator op = ae.getOp();
+                               TokenType keyType = ae.getKeyType();
+                               TokenType valueType = ae.getValueType();
+                               QualifierFilterEntity entry = new 
QualifierFilterEntity(fieldName,value,op,keyType,valueType);
+
+                               // TODO Exact match, need to add escape for 
those special characters here, including:
+                               // "-", "[", "]", "/", "{", "}", "(", ")", "*", 
"+", "?", ".", "\\", "^", "$", "|"
+
+                               if(keyType == TokenType.ID && isTag(fieldName)){
+                                       if 
((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
+                                                       && 
!TokenType.NULL.equals(valueType))
+                                       {
+                                               // Use RowFilter for equal TAG
+                                               if(tagFilters.get(fieldName) == 
null) tagFilters.put(fieldName, new ArrayList<String>());
+                                               
tagFilters.get(fieldName).add(value);
+                                       } else if (rowFilterPriority && 
ComparisonOperator.IN.equals(op))
+                                       {
+                                               // Use RowFilter here by default
+                                               if(tagFilters.get(fieldName) == 
null) tagFilters.put(fieldName, new ArrayList<String>());
+                                               
tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
+                                       } else if 
(ComparisonOperator.LIKE.equals(op)
+                                               || 
ComparisonOperator.NOT_LIKE.equals(op)
+                                               || 
ComparisonOperator.CONTAINS.equals(op)
+                                               || 
ComparisonOperator.NOT_CONTAINS.equals(op)
+                                               || 
ComparisonOperator.IN.equals(op)
+                                               || 
ComparisonOperator.IS.equals(op)
+                                               || 
ComparisonOperator.IS_NOT.equals(op)
+                                               || 
ComparisonOperator.NOT_EQUAL.equals(op)
+                                               || 
ComparisonOperator.EQUAL.equals(op)
+                                               || 
ComparisonOperator.NOT_IN.equals(op))
+                                       {
+                                               qualifierFilters.add(entry);
+                                       } else
+                                       {
+                                               LOG.warn("Don't support 
operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to 
ignore");
+                                               throw new 
IllegalArgumentException("Don't support operation: "+op+" on tag field: 
"+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not 
contains");
+                                       }
+                               }else{
+                                       qualifierFilters.add(entry);
+                               }
+                       }
+
+                       // Build RowFilter for equal tags
+                       list.addFilter(buildTagFilter(tagFilters));
+
+                       // Build SingleColumnValueFilter
+                       FilterList qualifierFilterList = 
buildQualifierFilter(qualifierFilters);
+                       if(qualifierFilterList != null && 
qualifierFilterList.getFilters().size()>0){
+                               list.addFilter(qualifierFilterList);
+                       }else {
+                               if(LOG.isDebugEnabled()) LOG.debug("Ignore 
empty qualifier filter from "+qualifierFilters.toString());
+                       }
+                       fltList.addFilter(list);
+               }
+               LOG.info("Query: " + _orExpr.toString() + " => Filter: " + 
fltList.toString());
+               return fltList;
+       }
+       
+       /**
+        * _charset is used to decode the byte array, in hbase server, 
RegexStringComparator uses the same
+        * charset to decode the byte array stored in qualifier
+        * for tag filter regex, it's always ISO-8859-1 as it only comes from 
String's hashcode (Integer)
+        * Note: regex comparasion is to compare String
+        */
+       protected Filter buildTagFilter(Map<String, List<String>> tagFilters){
+               RegexStringComparator regexStringComparator = new 
RegexStringComparator(buildTagFilterRegex(tagFilters));
+               regexStringComparator.setCharset(_charset);
+               RowFilter filter = new RowFilter(CompareOp.EQUAL, 
regexStringComparator);
+               return filter;
+       }
+       
+       /**
+        * all qualifiers' condition must be satisfied.
+        *
+        * <H1>Use RegexStringComparator for:</H1>
+        *      IN
+        *      LIKE
+        *      NOT_LIKE
+        *
+        * <H1>Use SubstringComparator for:</H1>
+        *      CONTAINS
+        *
+        * <H1>Use EntityQualifierHelper for:</H1>
+        *      EQUALS
+        *      NOT_EUQALS
+        *      LESS
+        *      LESS_OR_EQUAL
+        *      GREATER
+        *      GREATER_OR_EQUAL
+        *
+        * <H2>
+        *     TODO: Compare performance of RegexStringComparator 
,SubstringComparator ,EntityQualifierHelper
+        * </H2>
+        *
+        * @param qualifierFilters
+        * @return
+        */
+       protected FilterList buildQualifierFilter(List<QualifierFilterEntity> 
qualifierFilters){
+               FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+               // iterate all the qualifiers
+               for(QualifierFilterEntity entry : qualifierFilters){
+                       // if contains expression based filter
+                       if(entry.getKeyType() == TokenType.EXP
+                                       || entry.getValueType() == TokenType.EXP
+                                       || entry.getKeyType() != TokenType.ID){
+                               
if(!EagleConfigFactory.load().isCoprocessorEnabled()) {
+                                       LOG.warn("Expression in filter may not 
support, because custom filter and coprocessor is disabled: " + 
entry.toString());
+                               }
+                               
list.addFilter(buildExpressionBasedFilter(entry));
+                               continue;
+                       }
+
+                       // else using SingleColumnValueFilter
+                       String qualifierName = entry.getKey();
+                       if(!isTag(entry.getKey())){
+                               Qualifier qualifier = 
_ed.getDisplayNameMap().get(entry.getKey());
+                               qualifierName = qualifier.getQualifierName();
+                       }
+
+                       // Comparator to be used for building HBase Filter
+                       // WritableByteArrayComparable comparator;
+            ByteArrayComparable comparable;
+                       if(ComparisonOperator.IN.equals(entry.getOp())
+                               || 
ComparisonOperator.NOT_IN.equals(entry.getOp())){
+                               Filter setFilter = 
buildListQualifierFilter(entry);
+                               if(setFilter!=null){
+                                       list.addFilter(setFilter);
+                               }
+                       }else{
+                               // If [=,!=,is,is not] NULL, use NullComparator 
else throw exception
+                               if(TokenType.NULL.equals(entry.getValueType())){
+                                       
if(ComparisonOperator.EQUAL.equals(entry.getOp())
+                                               
||ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
+                                               
||ComparisonOperator.IS.equals(entry.getOp())
+                                               
||ComparisonOperator.IS_NOT.equals(entry.getOp()))
+                        comparable = new NullComparator();
+                                       else
+                                               throw new 
IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not 
supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] 
null|NULL");
+                               }
+                               // If [contains, not contains],use 
SubstringComparator
+                               else if 
(ComparisonOperator.CONTAINS.equals(entry.getOp())
+                                       || 
ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    comparable = new SubstringComparator(entry.getValue());
+                               }
+                               // If [like, not like], use 
RegexStringComparator
+                               else if 
(ComparisonOperator.LIKE.equals(entry.getOp())
+                                               || 
ComparisonOperator.NOT_LIKE.equals(entry.getOp())){
+                                       // Use RegexStringComparator for LIKE / 
NOT_LIKE
+                                       RegexStringComparator _comparator = new 
RegexStringComparator(buildQualifierRegex(entry.getValue()));
+                                       _comparator.setCharset(_charset);
+                    comparable = _comparator;
+                               } else{
+                                       Class type = 
EntityQualifierUtils.getType(_ed, entry.getKey());
+                                       // if type is null (is Tag or not 
found) or not defined for TypedByteArrayComparator
+                                       
if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || 
TypedByteArrayComparator.get(type) == null){
+                        comparable = new 
BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), 
entry.getValue()));
+                                       }else {
+                        comparable = new 
TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), 
entry.getValue()),type);
+                                       }
+                               }
+
+                               SingleColumnValueFilter filter =
+                                               new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable);
+                               filter.setFilterIfMissing(_filterIfMissing);
+                               list.addFilter(filter);
+                       }
+               }
+
+               return list;
+       }
+
+       private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
+               BooleanExpressionComparator expressionComparator  = new 
BooleanExpressionComparator(entry,_ed);
+               _filterFields = expressionComparator.getRequiredFields();
+               RowValueFilter filter = new 
RowValueFilter(expressionComparator);
+               return filter;
+       }
+
+       /**
+        * Currently use BinaryComparator only
+        * <h2>TODO: </h2>
+        * Possibility to tune performance by using: OR[BinaryComparator,...] 
instead of RegexStringComparator?
+        *
+        *<br/> <br/>
+        *
+        * ! Check op must be IN or NOTIN in caller
+        *
+        * @param entry
+        * @return
+        */
+       private Filter buildListQualifierFilter(QualifierFilterEntity entry){
+               List<String> valueSet = 
EntityQualifierUtils.parseList(entry.getValue());
+               Iterator<String> it = valueSet.iterator();
+               String fieldName = entry.getKey();
+               String qualifierName = fieldName;
+               if(!_ed.isTag(entry.getKey())){
+                       qualifierName = 
_ed.getDisplayNameMap().get(entry.getKey()).getQualifierName();
+               }
+
+// TODO: Try to use RegExp just work if possible
+// Because single SingleColumnValueFilter is much faster than multi 
SingleColumnValueFilters in OR list.
+//             Class qualifierType = 
EntityQualifierHelper.getType(_ed,fieldName);
+//             if( qualifierType == null || qualifierType == String.class){
+//                     boolean first = true;
+//                     StringBuilder filterRegex = new StringBuilder();
+//                     filterRegex.append("^(");
+//                     while(it.hasNext()) {
+//                             String value = it.next();
+//                             if(value == null) {
+//                                     logger.warn("ignore empty value in set 
qualifier filter: "+entry.toString());
+//                                     continue;
+//                             }
+//                             if(!first) filterRegex.append("|");
+//                             filterRegex.append(value);
+//                             first = false;
+//                     }
+//                     filterRegex.append(")$");
+//                     RegexStringComparator regexStringComparator = new 
RegexStringComparator(filterRegex.toString());
+//                     return new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(),
+//                                     convertToHBaseCompareOp(entry.getOp()), 
regexStringComparator);
+//             }else{
+               FilterList setFilterList;
+               if(ComparisonOperator.IN.equals(entry.getOp())){
+                       setFilterList = new FilterList(Operator.MUST_PASS_ONE);
+               }else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+                       setFilterList = new FilterList(Operator.MUST_PASS_ALL);
+               }else{
+                       throw new IllegalArgumentException(String.format("Don't 
support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN 
[LIST]",entry.getOp(),entry.toString()));
+               }
+
+               while(it.hasNext()) {
+                       String value = it.next();
+                       BinaryComparator comparator = new 
BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value));
+                       SingleColumnValueFilter filter =
+                                       new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator);
+                       filter.setFilterIfMissing(_filterIfMissing);
+                       setFilterList.addFilter(filter);
+               }
+
+               return setFilterList;
+//             }
+       }
+
+       /**
+        * Just used for LIKE and NOT_LIKE
+        *
+        * @param qualifierValue
+        * @return
+        */
+       protected String buildQualifierRegex(String qualifierValue){
+               StringBuilder sb = new StringBuilder();
+//             sb.append("(?s)");
+               sb.append("^");
+               sb.append(qualifierValue);
+               sb.append("$");
+               return sb.toString();
+       }
+       
+         /**
+          * Appends the given ID to the given buffer, followed by "\\E".
+          * [steal it from opentsdb, thanks opentsdb :) 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java]
+          */
+         private static void addId(final StringBuilder buf, final byte[] id) {
+               buf.append("\\Q");
+           boolean backslash = false;
+           for (final byte b : id) {
+             buf.append((char) (b & 0xFF));
+             if (b == 'E' && backslash) {  // If we saw a `\' and now we have 
a `E'.
+               // So we just terminated the quoted section because we just 
added \E
+               // to `buf'.  So let's put a litteral \E now and start quoting 
again.
+               buf.append("\\\\E\\Q");
+             } else {
+               backslash = b == '\\';
+             }
+           }
+           buf.append("\\E");
+         }
+         
+         @SuppressWarnings("unused")
+         private static void addId(final StringBuilder buf, final String id) {
+                   buf.append("\\Q");
+                       int len = id.length()-1;
+                   boolean backslash = false;
+                   for (int i =0; i < len; i++) {
+                     char c = id.charAt(i);
+                     buf.append(c);
+                     if (c == 'E' && backslash) {  // If we saw a `\' and now 
we have a `E'.
+                       // So we just terminated the quoted section because we 
just added \E
+                       // to `buf'.  So let's put a litteral \E now and start 
quoting again.
+                       buf.append("\\\\E\\Q");
+                     } else {
+                       backslash = c == '\\';
+                     }
+                   }
+                   buf.append("\\E");
+                 }
+       
+       /**
+        * one search tag may have multiple values which have OR relationship, 
and relationship between
+        * different search tags is AND
+        * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2"
+        * @param tags
+        * @return
+        */
+       protected String buildTagFilterRegex(Map<String, List<String>> tags){
+               // TODO need consider that \E could be part of tag, refer to 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java
+               final SortedMap<Integer, List<Integer>> tagHash = new 
TreeMap<Integer, List<Integer>>();
+               final int numOfPartitionFields = (_ed.getPartitions() == null) 
? 0 : _ed.getPartitions().length;
+               for(Map.Entry<String, List<String>> entry : tags.entrySet()){
+                       String tagName = entry.getKey();
+                       // Ignore tag if the tag is one of partition fields
+                       if (_ed.isPartitionTag(tagName)) {
+                               continue;
+                       }
+                       List<String> stringValues = entry.getValue();
+                       List<Integer> hashValues = new 
ArrayList<Integer>(stringValues.size());
+                       for(String value : stringValues){
+                               hashValues.add(value.hashCode());
+                       }
+                       tagHash.put(tagName.hashCode(), hashValues);
+               }
+               
+               // header = prefix(4 bytes) + partition_hashes(4*N bytes) + 
timestamp (8 bytes)
+               final int headerLength = 4 + numOfPartitionFields * 4 + 8;
+
+               // <tag1:4><value1:4> ... <tagn:4><valuen:4>
+               StringBuilder sb = new StringBuilder();
+               sb.append("(?s)");
+               sb.append("^(?:.{").append(headerLength).append("})");
+               sb.append("(?:.{").append(8).append("})*"); // for any number 
of tags
+               for (Map.Entry<Integer, List<Integer>> entry : 
tagHash.entrySet()) {
+                       try {
+                               addId(sb, ByteUtil.intToBytes(entry.getKey()));
+                               List<Integer> hashValues = entry.getValue();
+                               sb.append("(?:");
+                               boolean first = true;
+                               for(Integer value : hashValues){
+                                       if(!first){
+                                               sb.append('|');
+                                       }
+                                       addId(sb, ByteUtil.intToBytes(value));
+                                       first = false;
+                               }
+                               sb.append(")");
+                               sb.append("(?:.{").append(8).append("})*"); // 
for any number of tags
+                       } catch (Exception ex) {
+                               LOG.error("constructing regex error", ex);
+                       }
+               }
+               sb.append("$");
+               if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + 
sb.toString());
+               return sb.toString();
+       }
+
+       /**
+        * Convert ComparisonOperator to native HBase CompareOp
+        *
+        * Support:
+        *      =, =~,CONTAINS,<,<=,>,>=,!=,!=~
+        *
+        * @param comp
+        * @return
+        */
+       protected static CompareOp convertToHBaseCompareOp(ComparisonOperator 
comp) {
+               if(comp == ComparisonOperator.EQUAL || comp == 
ComparisonOperator.LIKE
+                               || comp == ComparisonOperator.CONTAINS
+                               || comp == ComparisonOperator.IN
+                               || comp == ComparisonOperator.IS
+                               ) {
+                       return CompareOp.EQUAL;
+               }else if(comp == ComparisonOperator.LESS) {
+                       return CompareOp.LESS;
+               } else if(comp == ComparisonOperator.LESS_OR_EQUAL){
+                       return CompareOp.LESS_OR_EQUAL;
+               }else if(comp == ComparisonOperator.GREATER) {
+                       return CompareOp.GREATER;
+               } else if(comp == ComparisonOperator.GREATER_OR_EQUAL){
+                       return CompareOp.GREATER_OR_EQUAL;
+               } else if(comp == ComparisonOperator.NOT_EQUAL
+                               || comp == ComparisonOperator.NOT_LIKE
+                               || comp == ComparisonOperator.NOT_CONTAINS
+                               || comp == ComparisonOperator.IS_NOT
+                               || comp == ComparisonOperator.NOT_IN)
+               {
+                       return CompareOp.NOT_EQUAL;
+               } else {
+                       LOG.error("{} operation is not supported now\n", comp);
+                       throw new IllegalArgumentException("Illegal operation: 
"+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values()));
+               }
+       }
+
+       protected static CompareOp getHBaseCompareOp(String comp) {
+               return 
convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
new file mode 100755
index 0000000..6cdc77b
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.TokenType;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class QualifierFilterEntity implements Writable{
+       public String key;
+       public String value;
+       public ComparisonOperator op;
+       public TokenType valueType;
+       public TokenType keyType;
+
+       public QualifierFilterEntity(){}
+       public QualifierFilterEntity(String key, String value, 
ComparisonOperator comp, TokenType keyType, TokenType valueType) {
+               super();
+               this.key = key;
+               this.value = value;
+               this.op = comp;
+               this.keyType = keyType;
+               this.valueType = valueType;
+       }
+
+       public String getKey() {
+               return key;
+       }
+
+       public void setKey(String key) {
+               this.key = key;
+       }
+
+       public String getValue() {
+               return value;
+       }
+
+       public void setValue(String value) {
+               this.value = value;
+       }
+
+       public ComparisonOperator getOp() {
+               return op;
+       }
+
+       public void setOp(ComparisonOperator op) {
+               this.op = op;
+       }
+
+       public TokenType getValueType() {
+               return valueType;
+       }
+
+       public void setValueType(TokenType valueType) {
+               this.valueType = valueType;
+       }
+
+       public void setKeyType(TokenType keyType){
+               this.keyType = keyType;
+       }
+       public TokenType getKeyType(){
+               return this.keyType;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("%s %s %s",this.key,this.op,this.value);
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               out.writeUTF(this.key);
+               out.writeUTF(this.getValue());
+               out.writeUTF(this.op.name());
+               out.writeUTF(this.keyType.name());
+               out.writeUTF(this.valueType.name());
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               this.key = in.readUTF();
+               this.value = in.readUTF();
+               this.op = ComparisonOperator.valueOf(in.readUTF());
+               this.keyType = TokenType.valueOf(in.readUTF());
+               this.valueType = TokenType.valueOf(in.readUTF());
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
new file mode 100755
index 0000000..a4b97ea
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TODO: Critical performance problem!!!
+ * TODO: Refactor to specified multi-column filter so that avoid return all 
qualifier columns from region server to client side
+ *
+ * @since 2014/11/17
+ */
+public class RowValueFilter extends FilterBase {
+    private final static Logger LOG = 
LoggerFactory.getLogger(RowValueFilter.class);
+    private boolean filterOutRow = false;
+    private WritableComparable<List<KeyValue>> comparator;
+
+    // TODO: Use qualifiers to reduce network tranfer
+//    private List<byte[]> qualifiers;
+    public RowValueFilter(){}
+
+    /**
+     * Filter out row if WritableComparable.compareTo return 0
+     * @param comparator <code>WritableComparable[List[KeyValue]]</code>
+     */
+    public RowValueFilter(WritableComparable<List<KeyValue>> comparator){
+        this.comparator = comparator;
+    }
+
+//    public RowValueFilter(List<byte[]> 
qualifiers,WritableComparable<List<KeyValue>> comparator){
+//        this.qualifiers = qualifiers;
+//        this.comparator = comparator;
+//    }
+
+    /**
+     * Old interface in hbase-0.94
+     *
+     * @param out
+     * @throws IOException
+     */
+    @Deprecated
+    public void write(DataOutput out) throws IOException {
+        this.comparator.write(out);
+    }
+
+    /**
+     * Old interface in hbase-0.94
+     *
+     * @param in
+     * @throws IOException
+     */
+//    @Override
+    @Deprecated
+    public void readFields(DataInput in) throws IOException {
+        this.comparator = new BooleanExpressionComparator();
+        this.comparator.readFields(in);
+    }
+
+    /**
+     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff based
+     *
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public byte[] toByteArray() throws IOException {
+        ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+        this.comparator.write(byteArrayDataOutput);
+        return byteArrayDataOutput.toByteArray();
+    }
+
+    /**
+     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff based
+     */
+    // Override static method
+    public static Filter parseFrom(final byte [] pbBytes) throws 
DeserializationException {
+        ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(pbBytes);
+        RowValueFilter filter = new RowValueFilter();
+        try {
+            filter.readFields(byteArrayDataInput);
+        } catch (IOException e) {
+            LOG.error("Got error to deserialize RowValueFilter from PB 
bytes",e);
+            throw new DeserializationException(e);
+        }
+        return filter;
+    }
+
+    @Override
+    public boolean hasFilterRow(){
+        return true;
+    }
+
+    @Override
+    public void filterRow(List<KeyValue> row) {
+        filterOutRow = (this.comparator.compareTo(row) == 0);
+    }
+
+    @Override
+    public void reset() {
+        this.filterOutRow = false;
+    }
+
+    @Override
+    public boolean filterRow(){
+        return filterOutRow;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()+" ( "+this.comparator.toString()+" )";
+    }
+
+//    public List<byte[]> getQualifiers() {
+//        return qualifiers;
+//    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
new file mode 100755
index 0000000..ecaf8cc
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.io.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <h1>TypedByteArrayComparator</h1>
+ *
+ * Compare byte array: <code>byte[] value</code> with class type: <code>Class 
type</code>
+ *
+ * <br/>
+ * <br/>
+ * Built-in support:
+ *
+ *  <pre>
+ *    Double
+ *    double
+ *    Integer
+ *    int
+ *    Long
+ *    long
+ *    Short
+ *    short
+ *    Boolean
+ *    boolean
+ *  </pre>
+ *
+ *  And can be extend by defining new {@link RawComparator} and register with  
{@link #define(Class type, RawComparator comparator)}
+ * <br/>
+ * <br/>
+ */
+public class TypedByteArrayComparator extends ByteArrayComparable {
+    private final static Logger LOG = 
LoggerFactory.getLogger(TypedByteArrayComparator.class);
+
+    private Class type;
+
+    // Not need to be writable
+    private RawComparator comparator;
+
+    /**
+     * Default constructor for writable
+     */
+    @SuppressWarnings("unused")
+    public TypedByteArrayComparator(){
+        super(null);
+    }
+
+    public TypedByteArrayComparator(byte[] value, Class type){
+        super(value);
+        this.type = type;
+        this.comparator = get(this.type);
+        if(this.comparator == null) throw new IllegalArgumentException("No 
comparator found for class: "+type);
+    }
+
+    /**
+     * @param in hbase-0.94 interface
+     * @throws IOException
+     */
+//    @Override
+    public void readFields(DataInput in) throws IOException {
+//        super.readFields(in);
+        try {
+            String _type = in.readUTF();
+            type = _primitiveTypeClassMap.get(_type);
+            if(type == null) {
+                type = Class.forName(_type);
+            }
+            comparator = get(type);
+            if(comparator == null) throw new IllegalArgumentException("No 
comparator found for class: "+type);
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e.getMessage(),e);
+        }
+    }
+
+    /**
+     * @param out hbase-0.94 interface
+     * @throws IOException
+     */
+//    @Override
+    public void write(DataOutput out) throws IOException {
+//        super.write(out);
+        String typeName = type.getName();
+        out.writeUTF(typeName);
+    }
+
+    /**
+     * For hbase 0.98
+     *
+     * @return serialized byte array
+     */
+    @Override
+    public byte[] toByteArray() {
+        ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+        try {
+            this.write(byteArrayDataOutput);
+            return byteArrayDataOutput.toByteArray();
+        } catch (IOException e) {
+            LOG.error("Failed to serialize due to: "+e.getMessage(),e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * For hbase 0.98
+     *
+     * @param bytes raw byte array
+     * @return Comparator instance
+     * @throws DeserializationException
+     */
+    public static TypedByteArrayComparator parseFrom(final byte [] bytes)
+            throws DeserializationException {
+        TypedByteArrayComparator comparator = new TypedByteArrayComparator();
+        ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(bytes);
+        try {
+            comparator.readFields(byteArrayDataInput);
+        } catch (IOException e) {
+            LOG.error("Got error to deserialize TypedByteArrayComparator from 
PB bytes",e);
+            throw new DeserializationException(e);
+        }
+        return comparator;
+    }
+
+    @Override
+    public int compareTo(byte[] value, int offset, int length) {
+        return this.comparator.compare(this.getValue(), 0, 
this.getValue().length, value, offset, length);
+    }
+
+    /**
+     * <ol>
+     * <li>Try registered comparator</li>
+     * <li>If not found, try all possible WritableComparator</li>
+     * </ol>
+     *
+     * If not found finally, throw new IllegalArgumentException("unable to get 
comparator for class: "+type);
+     *
+     * @param type value type class
+     * @return RawComparator
+     */
+    public static RawComparator get(Class type){
+        RawComparator comparator = null;
+        try {
+            comparator = _typedClassComparator.get(type);
+        }catch (ClassCastException ex){
+            // ignore
+        }
+        try {
+            if (comparator == null) comparator = WritableComparator.get(type);
+        }catch (ClassCastException ex){
+            // ignore
+        }
+        return comparator;
+    }
+
+    private final static Map<Class,RawComparator> _typedClassComparator = new 
HashMap<Class, RawComparator>();
+    public static void define(Class type, RawComparator comparator){
+        _typedClassComparator.put(type,comparator);
+    }
+
+    static{
+        define(Double.class, WritableComparator.get(DoubleWritable.class));
+        define(double.class, WritableComparator.get(DoubleWritable.class));
+        define(Integer.class, WritableComparator.get(IntWritable.class));
+        define(int.class, WritableComparator.get(IntWritable.class));
+        define(Long.class, WritableComparator.get(LongWritable.class));
+        define(long.class, WritableComparator.get(LongWritable.class));
+        define(Short.class, WritableComparator.get(ShortWritable.class));
+        define(short.class, WritableComparator.get(ShortWritable.class));
+        define(Boolean.class, WritableComparator.get(BooleanWritable.class));
+        define(boolean.class, WritableComparator.get(BooleanWritable.class));
+    }
+
+    /**
+     * Because {@link Class#forName } can't find class for primitive type
+     */
+    private final static Map<String,Class> _primitiveTypeClassMap = new 
HashMap<String, Class>();
+    static {
+        _primitiveTypeClassMap.put(int.class.getName(),int.class);
+        _primitiveTypeClassMap.put(double.class.getName(),double.class);
+        _primitiveTypeClassMap.put(long.class.getName(),long.class);
+        _primitiveTypeClassMap.put(short.class.getName(),short.class);
+        _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class);
+        _primitiveTypeClassMap.put(char.class.getName(),char.class);
+        _primitiveTypeClassMap.put(byte.class.getName(),byte.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
new file mode 100755
index 0000000..418ab33
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+public abstract class IndexLogReader implements LogReader {
+
+       // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. 
More graceful implementation should use SingleColumnValueExcludeFilter, 
+       // but it's complicated in current implementation. 
+       protected static void workaroundHBASE2198(Get get, Filter 
filter,byte[][] qualifiers) {
+               if (filter instanceof SingleColumnValueFilter) {
+                       if(qualifiers == null) {
+                               get.addFamily(((SingleColumnValueFilter) 
filter).getFamily());
+                       }else{
+                               get.addColumn(((SingleColumnValueFilter) 
filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
+                       }
+                       return;
+               }
+               if (filter instanceof FilterList) {
+                       for (Filter f : ((FilterList)filter).getFilters()) {
+                               workaroundHBASE2198(get, f,qualifiers);
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
new file mode 100755
index 0000000..9e059f2
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class IndexStreamReader  extends StreamReader {
+       protected final IndexDefinition indexDef;
+       protected final SearchCondition condition;
+       protected final List<byte[]> indexRowkeys;
+       protected LogReader<InternalLog> reader;
+       protected long lastTimestamp = 0;
+       protected long firstTimestamp = 0;
+       
+       protected static final Logger LOG = 
LoggerFactory.getLogger(IndexStreamReader.class);
+
+       public IndexStreamReader(IndexDefinition indexDef, SearchCondition 
condition, List<byte[]> indexRowkeys) {
+               this.indexDef = indexDef;
+               this.condition = condition;
+               this.indexRowkeys = indexRowkeys;
+               this.reader = null;
+       }
+
+       @Override
+       public long getLastTimestamp() {
+               return lastTimestamp;
+       }
+
+       @Override
+       public long getFirstTimestamp() {
+               return this.firstTimestamp;
+       }
+
+       @Override
+       public void readAsStream() throws Exception {
+               if (reader == null) {
+                       reader = createIndexReader();
+               }
+               final EntityDefinition entityDef = 
indexDef.getEntityDefinition();
+               try{
+                       reader.open();
+                       InternalLog log;
+                       int count = 0;
+                       while ((log = reader.read()) != null) {
+                               TaggedLogAPIEntity entity = 
HBaseInternalLogHelper.buildEntity(log, entityDef);
+                               
entity.setSerializeAlias(condition.getOutputAlias());
+                               
entity.setSerializeVerbose(condition.isOutputVerbose());
+
+                               if (lastTimestamp == 0 || lastTimestamp < 
entity.getTimestamp()) {
+                                       lastTimestamp = entity.getTimestamp();
+                               }
+                               if(firstTimestamp == 0 || firstTimestamp > 
entity.getTimestamp()){
+                                       firstTimestamp = entity.getTimestamp();
+                               }
+                               for(EntityCreationListener l : _listeners){
+                                       l.entityCreated(entity);
+                               }
+                               if(++count == condition.getPageSize()) {
+                                       break;
+                               }
+                       }
+               }catch(IOException ioe){
+                       LOG.error("Fail reading log", ioe);
+                       throw ioe;
+               }finally{
+                       reader.close();
+               }               
+       }
+
+       protected abstract LogReader createIndexReader();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
new file mode 100755
index 0000000..e6a5c96
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class NonClusteredIndexLogReader extends IndexLogReader {
+       private final IndexDefinition indexDef;
+       private final List<byte[]> indexRowkeys;
+       private final byte[][] qualifiers;
+       private final Filter filter;
+       private HTableInterface tbl;
+       private boolean isOpen = false;
+       private Result[] results;
+       private int index = -1;
+       private final List<Scan> scans;
+       private int currentScanIndex = 0;
+       private ResultScanner currentResultScanner;
+
+       // Max tag key/value. 
+       private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 
0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 
0XFF,(byte) 0XFF};
+       private static final int BATCH_MULTIGET_SIZE = 1000;
+
+       public NonClusteredIndexLogReader(IndexDefinition indexDef, 
List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
+               this.indexDef = indexDef;
+               this.indexRowkeys = indexRowkeys;
+               this.qualifiers = qualifiers;
+               this.filter = filter;
+               this.scans = buildScans();
+       }
+       
+
+       private List<Scan> buildScans() {
+               final ArrayList<Scan> result = new 
ArrayList<Scan>(indexRowkeys.size());
+               for (byte[] rowkey : indexRowkeys) {
+                       Scan s = new Scan();
+                       s.setStartRow(rowkey);
+                       // In rowkey the tag key/value is sorted by the hash 
code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key
+                       final byte[] stopRowkey = ByteUtil.concat(rowkey, 
MAX_TAG_VALUE_BYTES);
+                       s.setStopRow(stopRowkey);
+                       // TODO the # of cached rows should be minimum of 
(pagesize and 100)
+                       int cs = 
EagleConfigFactory.load().getHBaseClientScanCacheSize();
+                       s.setCaching(cs);
+                       // TODO not optimized for all applications
+                       s.setCacheBlocks(true);
+                       // scan specified columnfamily for all qualifiers
+                       
s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes());
+                       result.add(s);
+               }
+               return result;
+       }
+
+       @Override
+       public void open() throws IOException {
+               if (isOpen)
+                       return; // silently return
+               try {
+                       tbl = 
EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+               } catch (RuntimeException ex) {
+                       throw new IOException(ex);
+               }
+               currentScanIndex = 0;
+               openNewScan();
+               fillResults();
+       }
+
+       private boolean openNewScan() throws IOException {
+               closeCurrentScanResult();
+               if (currentScanIndex >= scans.size()) {
+                       return false;
+               }
+               final Scan scan = scans.get(currentScanIndex++);
+               currentResultScanner = tbl.getScanner(scan);
+               return true;
+       }
+
+       private void fillResults() throws IOException {
+               if (currentResultScanner == null) {
+                       return;
+               }
+               index = 0;
+               int count = 0;
+               Result r = null;
+        final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE);
+               final byte[] family = 
indexDef.getEntityDefinition().getColumnFamily().getBytes();
+               while (count < BATCH_MULTIGET_SIZE) {
+                       r = currentResultScanner.next();
+                       if (r == null) {
+                               if (openNewScan()) {
+                                       continue;
+                               } else {
+                                       break;
+                               }
+                       }
+                       for (byte[] rowkey : r.getFamilyMap(family).keySet()) {
+                               if (rowkey.length == 0) {       // invalid 
rowkey
+                                       continue;
+                               }
+                               final Get get = new Get(rowkey);
+                if (filter != null) {
+                       get.setFilter(filter);
+                }
+                               if(qualifiers != null) {
+                                       for (int j = 0; j < qualifiers.length; 
++j) {
+                                               // Return the specified 
qualifiers
+                                               get.addColumn(family, 
qualifiers[j]);
+                                       }
+                               }else {
+                                       get.addFamily(family);
+                               }
+                       workaroundHBASE2198(get, filter,qualifiers);
+                               gets.add(get);
+                               ++count;
+                       }
+               }
+               if (count == 0) {
+                       results = null;
+                       return;
+               }
+               results = tbl.get(gets);
+               if (results == null || results.length == 0) {
+                       fillResults();
+               }
+       }
+
+
+       private void closeCurrentScanResult() {
+               if (currentResultScanner != null) {
+                       currentResultScanner.close();
+                       currentResultScanner = null;
+               }
+       }
+
+
+       @Override
+       public void close() throws IOException {
+               if(tbl != null){
+                       new HTableFactory().releaseHTableInterface(tbl);
+               }
+               closeCurrentScanResult();
+       }
+
+       @Override
+       public InternalLog read() throws IOException {
+               if (tbl == null) {
+                       throw new IllegalArgumentException("Haven't open before 
reading");
+               }
+               
+               Result r = null;
+               InternalLog t = null;
+               while ((r = getNextResult()) != null) {
+                       if (r.getRow() == null) {
+                               continue;
+                       }
+                       t = 
HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+                       break;
+               }
+               return t;
+       }
+
+
+       private Result getNextResult() throws IOException {
+               if (results == null || results.length == 0 || index >= 
results.length) {
+                       fillResults();
+               }
+               if (results == null || results.length == 0 || index >= 
results.length) {
+                       return null;
+               }
+               return results[index++];
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
new file mode 100755
index 0000000..ec5631a
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonClusteredIndexStreamReader extends IndexStreamReader {
+       public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition) {
+               super(indexDef, condition, new ArrayList<byte[]>());
+               final IndexType type = 
indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+               if (!IndexType.NON_CLUSTER_INDEX.equals(type)) {
+                       throw new IllegalArgumentException("This query can't go 
through index: " + condition.getQueryExpression());
+               }
+       }
+
+       public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition, List<byte[]> indexRowkeys) {
+               super(indexDef, condition, indexRowkeys);
+       }
+
+       @Override
+       protected LogReader createIndexReader() {
+               final EntityDefinition entityDef = 
indexDef.getEntityDefinition();
+               byte[][] outputQualifiers = null;
+               if(!condition.isOutputAll()) {
+                       outputQualifiers = 
HBaseInternalLogHelper.getOutputQualifiers(entityDef, 
condition.getOutputFields());
+               }
+               return new NonClusteredIndexLogReader(indexDef, indexRowkeys, 
outputQualifiers, condition.getFilter());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
new file mode 100755
index 0000000..1c16dc8
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+
+public class RowKeyLogReader extends IndexLogReader {
+       private final EntityDefinition ed;
+       private final List<byte[]> rowkeys;
+    private final byte[][] qualifiers;
+    private HTableInterface tbl;
+       private boolean isOpen = false;
+       private Result[] entityResult;
+    private int getIndex = -1;
+
+    public RowKeyLogReader(EntityDefinition ed, byte[] rowkey) {
+        this.ed = ed;
+        this.rowkeys = new ArrayList<>();
+        this.rowkeys.add(rowkey);
+        this.qualifiers = null;
+    }
+
+       public RowKeyLogReader(EntityDefinition ed, byte[] rowkey,byte[][] 
qualifiers) {
+               this.ed = ed;
+               this.rowkeys = new ArrayList<>();
+        this.rowkeys.add(rowkey);
+        this.qualifiers = qualifiers;
+       }
+
+       public RowKeyLogReader(EntityDefinition ed, List<byte[]> 
rowkeys,byte[][] qualifiers) {
+               this.ed = ed;
+               this.rowkeys = rowkeys;
+        this.qualifiers = qualifiers;
+       }
+
+       @Override
+       public void open() throws IOException {
+               if (isOpen)
+                       return; // silently return
+               try {
+                       tbl = 
EagleConfigFactory.load().getHTable(ed.getTable());
+               } catch (RuntimeException ex) {
+                       throw new IOException(ex);
+               }
+               final byte[] family = ed.getColumnFamily().getBytes();
+        List<Get> gets = new ArrayList<>(this.rowkeys.size());
+
+        for(byte[] rowkey:rowkeys) {
+            Get get = new Get(rowkey);
+            get.addFamily(family);
+
+            if(qualifiers != null) {
+                for(byte[] qualifier: qualifiers){
+                    get.addColumn(family,qualifier);
+                }
+            }
+
+            gets.add(get);
+        }
+
+        entityResult = tbl.get(gets);
+               isOpen = true;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if(tbl != null){
+                       new HTableFactory().releaseHTableInterface(tbl);
+               }
+       }
+
+       @Override
+       public InternalLog read() throws IOException {
+        if(entityResult == null || entityResult.length == 0 || this.getIndex 
>= entityResult.length - 1){
+            return null;
+        }
+        getIndex ++;
+               InternalLog t = HBaseInternalLogHelper.parse(ed, 
entityResult[getIndex], this.qualifiers);
+               return t;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
new file mode 100755
index 0000000..8ff3448
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+
+public class UniqueIndexLogReader extends IndexLogReader {
+
+       private final IndexDefinition indexDef;
+       private final List<byte[]> indexRowkeys; 
+       private final byte[][] qualifiers;
+       private final Filter filter;
+       private HTableInterface tbl;
+       private boolean isOpen = false;
+       private Result[] entityResults;
+       private int index = -1;
+
+       public UniqueIndexLogReader(IndexDefinition indexDef, List<byte[]> 
indexRowkeys, byte[][] qualifiers, Filter filter) {
+               this.indexDef = indexDef;
+               this.indexRowkeys = indexRowkeys;
+               this.qualifiers = qualifiers;
+               this.filter = filter;
+       }
+
+       @Override
+       public void open() throws IOException {
+               if (isOpen)
+                       return; // silently return
+               try {
+                       tbl = 
EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+               } catch (RuntimeException ex) {
+                       throw new IOException(ex);
+               }
+               final byte[] family = 
indexDef.getEntityDefinition().getColumnFamily().getBytes();
+        final List<Get> indexGets = new ArrayList<>();
+        for (byte[] rowkey : indexRowkeys) {
+            Get get = new Get(rowkey);
+            // Return all index qualifiers
+            get.addFamily(family);
+            indexGets.add(get);
+        }
+        final Result[] indexResults = tbl.get(indexGets);
+        indexGets.clear();
+        for (Result indexResult : indexResults) {
+               final NavigableMap<byte[], byte[]> map = 
indexResult.getFamilyMap(family);
+               if (map == null) {
+                       continue;
+               }
+               for (byte[] entityRowkey : map.keySet()) {
+                Get get = new Get(entityRowkey);
+                if (filter != null) {
+                       get.setFilter(filter);
+                }
+                               if(qualifiers == null) {
+                                       // filter all qualifiers if output 
qualifiers are null
+                                       get.addFamily(family);
+                               }else {
+                                       for (int i = 0; i < qualifiers.length; 
++i) {
+                                               // Return the specified 
qualifiers
+                                               get.addColumn(family, 
qualifiers[i]);
+                                       }
+                               }
+                               workaroundHBASE2198(get, filter,qualifiers);
+                       indexGets.add(get);
+               }
+        }
+        entityResults = tbl.get(indexGets);
+               isOpen = true;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if(tbl != null){
+                       new HTableFactory().releaseHTableInterface(tbl);
+               }
+       }
+
+       @Override
+       public InternalLog read() throws IOException {
+               if (entityResults == null) {
+                       throw new IllegalArgumentException("entityResults 
haven't been initialized before reading");
+               }
+               InternalLog t = null;
+               while (entityResults.length > ++index) {
+                       Result r = entityResults[index];
+                       if (r != null) {
+                               if (r.getRow() == null) {
+                                       continue;
+                               }
+                               t = 
HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+                               break;
+                       }
+               }
+               return t;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
new file mode 100755
index 0000000..0391d57
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniqueIndexStreamReader extends IndexStreamReader {
+       public UniqueIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition) {
+               super(indexDef, condition, new ArrayList<byte[]>());
+               final IndexType type = 
indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+               if (!IndexType.UNIQUE_INDEX.equals(type)) {
+                       throw new IllegalArgumentException("This query can't go 
through index: " + condition.getQueryExpression());
+               }
+       }
+
+       public UniqueIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition, List<byte[]> indexRowkeys) {
+               super(indexDef, condition, indexRowkeys);
+       }
+
+       @Override
+       protected LogReader createIndexReader() {
+               final EntityDefinition entityDef = 
indexDef.getEntityDefinition();
+//             final
+               byte[][] outputQualifiers = null;
+               if(!condition.isOutputAll()) {
+                       outputQualifiers = 
HBaseInternalLogHelper.getOutputQualifiers(entityDef, 
condition.getOutputFields());
+               }
+               return new UniqueIndexLogReader(indexDef, indexRowkeys, 
outputQualifiers, condition.getFilter());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
new file mode 100755
index 0000000..cf40e31
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+/**
+ * @since : 7/3/14,2014
+ */
+public class BooleanSerDeser implements EntitySerDeser<Boolean> {
+
+       public BooleanSerDeser(){}
+
+       @Override
+       public Boolean deserialize(byte[] bytes){
+               if(bytes != null && bytes.length > 0){
+                       if(bytes[0] == 0){
+                               return false;
+                       }else if(bytes[0] == 1){
+                               return true;
+                       }
+               }
+               return null;
+       }
+
+       @Override
+       public byte[] serialize(Boolean obj){
+               if(obj != null){
+                       if(obj){
+                               return new byte[]{1};
+                       }else{
+                               return new byte[]{0};
+                       }
+               }
+               return null;
+       }
+
+       @Override
+       public Class<Boolean> type() {
+               return Boolean.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
new file mode 100644
index 0000000..b64e528
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.FIELD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Column {
+       String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
new file mode 100644
index 0000000..6e3e9c6
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ColumnFamily {
+       String value() default "f";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
new file mode 100644
index 0000000..27b011c
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * @since 7/22/15
+ */
+public class Double2DArraySerDeser implements EntitySerDeser<double[][]> {
+    private final int SIZE = 8;
+    @Override
+    public double[][] deserialize(byte[] bytes){
+//        if((bytes.length-4) % SIZE != 0)
+//            return null;
+        int offset = 0;
+        // get size of int array
+        int rowSize = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
+
+        double[][] data = new double[rowSize][];
+        for(int i=0; i<rowSize; i++) {
+            int colSize = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            double[] values = null;
+            if (colSize >= 0){
+                values = new double[colSize];
+                for (int j = 0; j < colSize; j++) {
+                    values[j] = ByteUtil.bytesToDouble(bytes, offset);
+                    offset += SIZE;
+                }
+            }
+            data[i] = values;
+        }
+
+        return data;
+    }
+
+    /**
+     *
+     * @param obj
+     * @return
+     */
+    @Override
+    public byte[] serialize(double[][] obj){
+        if(obj == null) return null;
+        ByteArrayOutputStream data = new ByteArrayOutputStream();
+        int size = obj.length;
+        byte[] sizeBytes = ByteUtil.intToBytes(size);
+        data.write(sizeBytes,0,sizeBytes.length);
+
+        try{
+            for(double[] o:obj){
+                if(o!=null){
+                    data.write(ByteUtil.intToBytes(o.length));
+                    for(double d:o){
+                        data.write(ByteUtil.doubleToBytes(d),0,SIZE);
+                    }
+                }else{
+                    data.write(ByteUtil.intToBytes(-1),0,4);
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        byte[] bytes = data.toByteArray();
+        try {
+            data.close();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return bytes;
+    }
+
+    @Override
+    public Class<double[][]> type() {
+        return double[][].class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
new file mode 100755
index 0000000..d87e31c
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleArraySerDeser implements EntitySerDeser<double[]>{
+
+       public DoubleArraySerDeser(){}
+
+       private final int SIZE = 8;
+               @Override
+               public double[] deserialize(byte[] bytes){
+                       if((bytes.length-4) % SIZE != 0)
+                               return null;
+                       int offset = 0;
+                       // get size of int array
+                       int size = ByteUtil.bytesToInt(bytes, offset);
+                       offset += 4;
+                       double[] values = new double[size];
+                       for(int i=0; i<size; i++){
+                               values[i] = ByteUtil.bytesToDouble(bytes, 
offset);
+                               offset += SIZE;
+                       }
+                       return values;
+               }
+               
+               /**
+                * 
+                * @param obj
+                * @return
+                */
+               @Override
+               public byte[] serialize(double[] obj){
+                       if(obj == null)
+                               return null;
+                       int size = obj.length;
+                       byte[] array = new byte[4 + SIZE*size];
+                       byte[] first = ByteUtil.intToBytes(size);
+                       int offset = 0;
+                       System.arraycopy(first, 0, array, offset, first.length);
+                       offset += first.length;
+                       for(int i=0; i<size; i++){
+                               
System.arraycopy(ByteUtil.doubleToBytes(obj[i]), 0, array, offset, SIZE);
+                               offset += SIZE;
+                       }
+                       return array;
+               }
+
+       @Override
+       public Class<double[]> type() {
+               return double[].class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
new file mode 100755
index 0000000..330a99d
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleSerDeser implements EntitySerDeser<Double>{
+
+       @Override
+       public Double deserialize(byte[] bytes){
+               if(bytes.length < 8)
+                       return null;
+               return ByteUtil.bytesToDouble(bytes);
+       }
+       
+       @Override
+       public byte[] serialize(Double obj){
+               if(obj == null)
+                       return null;
+               return ByteUtil.doubleToBytes(obj);
+       }
+
+       @Override
+       public Class<Double> type(){
+               return Double.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
new file mode 100644
index 0000000..930743e
--- /dev/null
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class EntityConstants {
+       
+       public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 
00:00:00";
+       public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 
00:00:00";
+       public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 
00:00:00";
+       
+       public static final long FIXED_WRITE_TIMESTAMP = 
+                       
DateTimeUtil.humanDateToSecondsWithoutException(FIXED_WRITE_HUMANTIME) * 1000;
+
+}

Reply via email to