http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
index 00ece40..15a9f74 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
@@ -26,6 +26,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
@@ -80,6 +81,11 @@ public abstract class CloneExpressionVisitor extends 
TraverseAllExpressionVisito
     public Expression visit(KeyValueColumnExpression node) {
         return node;
     }
+    
+    @Override
+    public Expression visit(ArrayColumnExpression node) {
+        return node;
+    }
 
     @Override
     public Expression visit(ProjectedColumnExpression node) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
index 31f340d..100f099 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
@@ -113,6 +114,7 @@ public interface ExpressionVisitor<E> {
     public E visit(LiteralExpression node);
     public E visit(RowKeyColumnExpression node);
     public E visit(KeyValueColumnExpression node);
+    public E visit(ArrayColumnExpression node);
     public E visit(ProjectedColumnExpression node);
     public E visit(SequenceValueExpression node);
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
index 3b7067a..9e50bc4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
@@ -26,9 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -121,6 +121,11 @@ public class StatelessTraverseAllExpressionVisitor<E> 
extends TraverseAllExpress
     }
     
     @Override
+    public E visit(ArrayColumnExpression node) {
+        return null;
+    }
+    
+    @Override
     public E visit(ProjectedColumnExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
index 83b28bd..1a2f2cc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
@@ -26,9 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -114,6 +114,11 @@ public class StatelessTraverseNoExpressionVisitor<E> 
extends TraverseNoExpressio
     public E visit(RowKeyColumnExpression node) {
         return null;
     }
+    
+    @Override
+    public E visit(ArrayColumnExpression node) {
+        return null;
+    }
 
     @Override
     public E visit(KeyValueColumnExpression node) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index 92e5c20..3d6843d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 
 /**
  * When selecting specific columns in a SELECT query, this filter passes only 
selected columns
@@ -54,6 +54,8 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
     private byte[] emptyCFName;
     private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker;
     private Set<byte[]> conditionOnlyCfs;
+    private boolean usesEncodedColumnNames;
+    private byte[] emptyKVQualifier;
 
     public ColumnProjectionFilter() {
 
@@ -61,10 +63,12 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
 
     public ColumnProjectionFilter(byte[] emptyCFName,
             Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> 
columnsTracker,
-            Set<byte[]> conditionOnlyCfs) {
+            Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
         this.emptyCFName = emptyCFName;
         this.columnsTracker = columnsTracker;
         this.conditionOnlyCfs = conditionOnlyCfs;
+        this.usesEncodedColumnNames = usesEncodedColumnNames;
+        this.emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
     }
 
     @Override
@@ -88,6 +92,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
             familyMapSize--;
         }
         int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+        usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+        emptyKVQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore 
to the actual value.
         this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         while (conditionOnlyCfsSize > 0) {
             
this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -111,12 +118,13 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
                 }
             }
         }
-        // Write conditionOnlyCfs
-        WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+        // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+        WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1));
         for (byte[] f : this.conditionOnlyCfs) {
             WritableUtils.writeCompressedByteArray(output, f);
         }
-    }
+    
+}
 
     @Override
     public byte[] toByteArray() throws IOException {
@@ -156,9 +164,9 @@ public class ColumnProjectionFilter extends FilterBase 
implements Writable {
         // make sure we're not holding to any of the byte[]'s
         ptr.set(HConstants.EMPTY_BYTE_ARRAY);
         if (kvs.isEmpty()) {
-            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
-                    0, this.emptyCFName.length, 
QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, 
HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+            kvs.add(new KeyValue(firstKV.getRowArray(), 
firstKV.getRowOffset(), firstKV.getRowLength(),
+                    this.emptyCFName, 0, this.emptyCFName.length, 
emptyKVQualifier, 0,
+                    emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, 
Type.Maximum, null, 0, 0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index dba700b..a7146fc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -26,6 +26,7 @@ import java.util.TreeSet;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
@@ -94,7 +95,7 @@ public abstract class MultiKeyValueComparisonFilter extends 
BooleanExpressionFil
             refCount = foundColumns.size();
         }
         
-        public ReturnCode resolveColumn(Cell value) {
+        private ReturnCode resolveColumn(Cell value) {
             // Always set key, in case we never find a key value column of 
interest,
             // and our expression uses row key columns.
             setKey(value);
@@ -184,7 +185,7 @@ public abstract class MultiKeyValueComparisonFilter extends 
BooleanExpressionFil
         ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
             @Override
             public Void visit(KeyValueColumnExpression expression) {
-                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnName());
+                inputTuple.addColumn(expression.getColumnFamily(), 
expression.getColumnQualifier());
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
index 0d904bc..195c89c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java
@@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends 
SingleKeyValueComparisonFi
 
     public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] 
pbBytes) throws DeserializationException {
         try {
-            return 
(SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new 
SingleCQKeyValueComparisonFilter());
+            SingleCQKeyValueComparisonFilter writable = 
(SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new 
SingleCQKeyValueComparisonFilter());
+            return writable;
         } catch (IOException e) {
             throw new DeserializationException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..b97c4e9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.ArrayColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import 
org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 
 
 
@@ -58,7 +60,7 @@ public abstract class SingleKeyValueComparisonFilter extends 
BooleanExpressionFi
             @Override
             public Void visit(KeyValueColumnExpression expression) {
                 cf = expression.getColumnFamily();
-                cq = expression.getColumnName();
+                cq = expression.getColumnQualifier();
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index bcadc2b..19797cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -35,4 +35,5 @@ public interface ValueGetter {
   public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws 
IOException;
   
   public byte[] getRowKey();
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 6f9caa6..0f960e4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Lists;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
index 741bf87..56b60e9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java
@@ -125,4 +125,5 @@ public abstract class KeyValueBuilder {
   public abstract KVComparator getKeyValueComparator();
   
   public abstract List<Mutation> cloneIfNecessary(List<Mutation> mutations);
+  
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 237ed75..7c88a25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -17,12 +17,15 @@
  */
 package org.apache.phoenix.index;
 
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -44,16 +48,20 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.ArrayColumnExpression;
+import org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -67,14 +75,16 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
@@ -82,10 +92,13 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -93,6 +106,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.tephra.TxConstants;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -276,8 +290,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
-    // Map used to cache column family of data table and the corresponding 
column family for the local index
-    private Map<ImmutableBytesPtr, ImmutableBytesWritable> 
dataTableLocalIndexFamilyMap;
+    // Information for columns of data tables that are being indexed. The 
first part of the pair is column family and second part is the column name. 
+    private Set<Pair<String, String>> indexedColumnsInfo;
+    // Information for columns of data tables that are being covered by the 
index. The first part of the pair is column family and second part is the 
column name.
+    private Set<Pair<String, String>> coveredColumnsInfo;
+    // Map of covered columns where a key is column reference for a column in 
the data table
+    // and value is column reference for corresponding column in the index 
table.
+    // TODO: samarth confirm that we don't need a separate map for tracking 
column families of local indexes.
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -291,39 +311,40 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private boolean indexWALDisabled;
     private boolean isLocalIndex;
     private boolean immutableRows;
+    private boolean storeColsInSingleCell;
 
     // Transient state
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
     
-    private List<ImmutableBytesPtr> indexQualifiers;
     private int estimatedIndexRowKeyBytes;
     private int estimatedExpressionSize;
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
+    private boolean usesEncodedColumnNames;
+    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean 
isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
 
-    private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection 
connection) {
+    private IndexMaintainer(final PTable dataTable, final PTable index, 
PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() 
== PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        this.usesEncodedColumnNames = 
EncodedColumnsUtil.usesEncodedColumnNames(index);
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : 
index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + 
(this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the 
row key of the data table
@@ -334,7 +355,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             String dataFamilyName = 
IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getColumn(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? 
dataTable.getPColumnForColumnName(dataColumnName) : 
dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -367,7 +388,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.indexedColumnTypes = 
Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = 
Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
-        this.dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns 
- nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : 
nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -376,6 +397,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // TODO: check whether index is immutable or not. Currently it's 
always false so checking
         // data table is with immutable rows or not.
         this.immutableRows = dataTable.isImmutableRows();
+        this.storeColsInSingleCell = index.getStorageScheme() == 
StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL;
         int indexColByteSize = 0;
         ColumnResolver resolver = null;
         List<ParseNode> parseNodes = new ArrayList<ParseNode>(1);
@@ -397,6 +419,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             throw new RuntimeException(e); // Impossible
         }
         StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
+        this.indexedColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
+        this.coveredColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
+        
         IndexExpressionCompiler expressionIndexCompiler = new 
IndexExpressionCompiler(context);
         for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
             PColumn indexColumn = index.getPKColumns().get(i);
@@ -409,12 +434,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-               // get the column of the data table that corresponds to this 
index column
+               // get the column of the data column that corresponds to this 
index column
                    PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
                    boolean isPKColumn = SchemaUtil.isPKColumn(column);
                    if (isPKColumn) {
                        int dataPkPos = 
dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 
0 : 1) - (this.isMultiTenant ? 1 : 0);
                        this.rowKeyMetaData.setIndexPkPosition(dataPkPos, 
indexPos);
+                       indexedColumnsInfo.add(new Pair<>((String)null, 
column.getName().getString()));
                    } else {
                        indexColByteSize += column.getDataType().isFixedWidth() 
? SchemaUtil.getFixedByteSize(column) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                        try {
@@ -424,6 +450,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                                expression = 
CoerceExpression.create(expression, indexColumn.getDataType());
                            }
                         this.indexedExpressions.add(expression);
+                        indexedColumnsInfo.add(new 
Pair<>(column.getFamilyName().getString(), column.getName().getString()));
                     } catch (SQLException e) {
                         throw new RuntimeException(e); // Impossible
                     }
@@ -432,6 +459,45 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             else {
                indexColByteSize += expression.getDataType().isFixedWidth() ? 
SchemaUtil.getFixedByteSize(expression) : 
ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                 this.indexedExpressions.add(expression);
+                KeyValueExpressionVisitor kvVisitor = new 
KeyValueExpressionVisitor() {
+                    @Override
+                    public Void visit(KeyValueColumnExpression colExpression) {
+                        return addDataColInfo(dataTable, colExpression);
+                    }
+
+                    @Override
+                    public Void visit(ArrayColumnExpression expression) {
+                        return addDataColInfo(dataTable, expression);
+                    }
+
+                    private Void addDataColInfo(final PTable dataTable, 
Expression expression) {
+                        Preconditions.checkArgument(expression instanceof 
ArrayColumnExpression
+                                || expression instanceof 
KeyValueColumnExpression);
+
+                        KeyValueColumnExpression colExpression = null;
+                        if (expression instanceof ArrayColumnExpression) {
+                            colExpression =
+                                    ((ArrayColumnExpression) 
expression).getKeyValueExpression();
+                        } else {
+                            colExpression = ((KeyValueColumnExpression) 
expression);
+                        }
+                        byte[] cf = colExpression.getColumnFamily();
+                        byte[] cq = colExpression.getColumnQualifier();
+                        try {
+                            PColumn dataColumn =
+                                    cf == null ? 
dataTable.getPColumnForColumnQualifier(null, cq)
+                                            : dataTable.getColumnFamily(cf)
+                                                    
.getPColumnForColumnQualifier(cq);
+                            indexedColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName()
+                                    .getString(), 
dataColumn.getName().getString()));
+                        } catch (ColumnNotFoundException | 
ColumnFamilyNotFoundException
+                                | AmbiguousColumnException e) {
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    }
+                };
+                expression.accept(kvVisitor);
             }
             // set the sort order of the expression correctly
             if (indexColumn.getSortOrder() == SortOrder.DESC) {
@@ -442,18 +508,19 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         for (int i = 0; i < index.getColumnFamilies().size(); i++) {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
-                PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
-                PName dataTableFamily = column.getFamilyName();
-                this.coveredColumns.add(new 
ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes()));
-                if(isLocalIndex) {
-                    this.dataTableLocalIndexFamilyMap.put(new 
ImmutableBytesPtr(dataTableFamily.getBytes()), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString()))));
-                }
+                PColumn dataColumn = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
+                byte[] dataColumnCq = 
EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable);
+                byte[] indexColumnCq = 
EncodedColumnsUtil.getColumnQualifier(indexColumn, index);
+                this.coveredColumns.add(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq));
+                this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
+                        new 
ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
+                this.coveredColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName().getString(), 
dataColumn.getName().getString()));
             }
         }
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
         initCachedState();
     }
-
+    
     public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable 
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
@@ -856,35 +923,113 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
 
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey) throws IOException {
-        Put put = null;
+       byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
+       Put put = null;
         // New row being inserted: add the empty key value
         if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) {
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), 
QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                emptyKeyValueQualifierPtr));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
         }
-        int i = 0;
-        for (ColumnReference ref : this.getCoveredColumns()) {
-            ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
-            ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
-            if (value != null) {
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
+        if (storeColsInSingleCell) {
+            // map from index column family to list of pair of index column 
and data column (for covered columns)
+            Map<ByteBuffer, List<Pair<ColumnReference, ColumnReference>>> 
familyToColListMap = Maps.newHashMap();
+            for (ColumnReference ref : this.getCoveredColumns()) {
+               ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ByteBuffer cf = ByteBuffer.wrap(indexColRef.getFamily());
+                if (!familyToColListMap.containsKey(cf)) {
+                    familyToColListMap.put(cf, Lists.<Pair<ColumnReference, 
ColumnReference>>newArrayList());
+                }
+                familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref));
+            }
+            // iterate over each column family and create a byte[] containing 
all the columns 
+            for (Entry<ByteBuffer, List<Pair<ColumnReference, 
ColumnReference>>> entry : familyToColListMap.entrySet()) {
+                byte[] columnFamily = entry.getKey().array();
+                List<Pair<ColumnReference, ColumnReference>> colRefPairs = 
entry.getValue();
+                int maxIndex = Integer.MIN_VALUE;
+                // find the max col qualifier
+                for (Pair<ColumnReference, ColumnReference> colRefPair : 
colRefPairs) {
+                    int qualifier = 
getEncodedColumnQualifier(colRefPair.getFirst().getQualifier());
+                    maxIndex = Math.max(maxIndex, qualifier);
+                }
+                byte[][] colValues = new byte[maxIndex+1][];
+                // set the values of the columns
+                for (Pair<ColumnReference, ColumnReference> colRefPair : 
colRefPairs) {
+                       ColumnReference indexColRef = colRefPair.getFirst();
+                       ColumnReference dataColRef = colRefPair.getSecond();
+                       int dataArrayPos = 
getEncodedColumnQualifier(dataColRef.getQualifier());
+                       Expression expression = new ArrayColumnExpression(new 
PDatum() {
+                                               @Override
+                                               public boolean isNullable() {
+                                                       return false;
+                                               }
+                                               
+                                               @Override
+                                               public SortOrder getSortOrder() 
{
+                                                       return null;
+                                               }
+                                               
+                                               @Override
+                                               public Integer getScale() {
+                                                       return null;
+                                               }
+                                               
+                                               @Override
+                                               public Integer getMaxLength() {
+                                                       return null;
+                                               }
+                                               
+                                               @Override
+                                               public PDataType getDataType() {
+                                                       return null;
+                                               }
+                                       }, dataColRef.getFamily(), 
dataArrayPos);
+                       ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                    expression.evaluate(new ValueGetterTuple(valueGetter), 
ptr);
+                    byte[] value = ptr.copyBytesIfNecessary();
+                                       if (value != null) {
+                                               int indexArrayPos = 
getEncodedColumnQualifier(indexColRef.getQualifier());
+                        colValues[indexArrayPos] = value;
+                    }
+                }
+                
+                List<Expression> children = 
Lists.newArrayListWithExpectedSize(colRefPairs.size());
+                // create an expression list with all the columns
+                for (int j=0; j<colValues.length; ++j) {
+                    children.add(new LiteralExpression(colValues[j]==null ? 
ByteUtil.EMPTY_BYTE_ARRAY : colValues[j] ));
+                }
+                // we use ArrayConstructorExpression to serialize multiple 
columns into a single byte[]
+                // construct the ArrayConstructorExpression with a variable 
length data type (PVarchar) since columns can be of fixed or variable length 
+                ArrayConstructorExpression arrayExpression = new 
ArrayConstructorExpression(children, PVarchar.INSTANCE, rowKeyOrderOptimizable);
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                arrayExpression.evaluate(new BaseTuple() {}, ptr);
                 if (put == null) {
                     put = new Put(indexRowKey);
                     put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
+                ImmutableBytesPtr colFamilyPtr = new 
ImmutableBytesPtr(columnFamily);
                 //this is a little bit of extra work for installations that 
are running <0.94.14, but that should be rare and is a short-term set of 
wrappers - it shouldn't kill GC
-                if(this.isLocalIndex) {
-                    ImmutableBytesWritable localIndexColFamily = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable());
-                    put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, 
cq, ts, value));
-                } else {
-                    put.add(kvBuilder.buildPut(rowKey, 
ref.getFamilyWritable(), cq, ts, value));
+                put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, colFamilyPtr, 
ts, ptr));
+            }
+        }
+        else {
+               for (ColumnReference ref : this.getCoveredColumns()) {
+                //FIXME: samarth figure out a backward compatible way to 
handle this as coveredcolumnsmap won't be availble for older phoenix clients.
+                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+                ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+                ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
+                if (value != null) {
+                       if (put == null) {
+                        put = new Put(indexRowKey);
+                        put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                    }
+                       put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
             }
         }
@@ -973,14 +1118,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             
             for (ColumnReference ref : getCoveredColumns()) {
                 byte[] family = ref.getFamily();
-                if (this.isLocalIndex) {
-                    family = 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get();
-                }
+                ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete 
should be as well
                 if (deleteType == DeleteType.SINGLE_VERSION) {
-                    delete.deleteFamilyVersion(family, ts);
+                    delete.deleteFamilyVersion(indexColumn.getFamily(), ts);
                 } else {
-                    delete.deleteFamily(family, ts);
+                    delete.deleteFamily(indexColumn.getFamily(), ts);
                 }
             }
             if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -1001,12 +1144,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    byte[] family = this.isLocalIndex ? 
this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : 
ref.getFamily();
+                    ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete 
for index as well
-                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
+                        //FIXME: samarth change this. Index column qualifiers 
are not derivable from data table cqs.
+                        // Figure out a backward compatible way of going this 
since coveredColumnsMap won't be available
+                        // for older clients.
+                        delete.deleteColumn(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     } else {
-                        delete.deleteColumns(family, 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     }
                 }
             }
@@ -1061,15 +1207,16 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
         coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
-        dataTableLocalIndexFamilyMap = 
Maps.newHashMapWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
-            byte[] cf = Bytes.readByteArray(input);
-            byte[] cq = Bytes.readByteArray(input);
-            ColumnReference ref = new ColumnReference(cf,cq);
-            coveredColumns.add(ref);
-            if(isLocalIndex) {
-                dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new 
ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf)))));
-            }
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            byte[] indexTableCf = Bytes.readByteArray(input);
+            byte[] indexTableCq = Bytes.readByteArray(input);
+            ColumnReference dataColumn = new ColumnReference(dataTableCf, 
dataTableCq); 
+            coveredColumns.add(dataColumn);
+            ColumnReference indexColumn = new ColumnReference(indexTableCf, 
indexTableCq);
+            coveredColumnsMap.put(dataColumn, indexColumn);
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
@@ -1095,6 +1242,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
+            usesEncodedColumnNames = numIndexedExpressions > 0;
+            numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
             indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
                Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1148,6 +1297,22 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int encodedEstimatedIndexRowKeyBytesAndImmutableRows = 
WritableUtils.readVInt(input);
         this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows 
< 0;
         this.estimatedIndexRowKeyBytes = 
Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows);
+        int numCols = WritableUtils.readVInt(input);
+        //TODO: samarth figure out a backward compatible way of 
reading/writing indexedColumnsInfo
+        indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
+        for (int i = 1; i <= numCols; i++) {
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        }
+        coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
+        int numCoveredCols = WritableUtils.readVInt(input);
+        for (int i = 1; i <= numCoveredCols; i++) {
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        }
+        storeColsInSingleCell = WritableUtils.readVInt(input) > 0;
         initCachedState();
     }
     
@@ -1171,9 +1336,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         // Encode coveredColumns.size() and whether or not this is a local 
index
         WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * 
(isLocalIndex ? -1 : 1));
-        for (ColumnReference ref : coveredColumns) {
-            Bytes.writeByteArray(output, ref.getFamily());
-            Bytes.writeByteArray(output, ref.getQualifier());
+        for (Entry<ColumnReference, ColumnReference> ref : 
coveredColumnsMap.entrySet()) {
+            ColumnReference dataColumn = ref.getKey();
+            ColumnReference indexColumn = ref.getValue();
+            Bytes.writeByteArray(output, dataColumn.getFamily());
+            Bytes.writeByteArray(output, dataColumn.getQualifier());
+            Bytes.writeByteArray(output, indexColumn.getFamily());
+            Bytes.writeByteArray(output, indexColumn.getQualifier());
         }
         // TODO: remove when rowKeyOrderOptimizable hack no longer needed
         WritableUtils.writeVInt(output,indexTableName.length * 
(rowKeyOrderOptimizable ? 1 : -1));
@@ -1184,7 +1353,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), 
emptyKeyValueCFPtr.getLength());
         
-        WritableUtils.writeVInt(output, indexedExpressions.size());
+        // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+        int indexedExpressionsSize = (indexedExpressions.size() + 1) * 
(usesEncodedColumnNames ? 1 : -1);
+        WritableUtils.writeVInt(output, indexedExpressionsSize);
         for (Expression expression : indexedExpressions) {
                WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
                expression.write(output);
@@ -1195,6 +1366,17 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? 
-1 : 1));
         // Encode estimatedIndexRowKeyBytes and immutableRows together.
         WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * 
(immutableRows ? -1 : 1));
+        WritableUtils.writeVInt(output, indexedColumnsInfo.size());
+        for (Pair<String, String> colInfo : indexedColumnsInfo) {
+            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
+            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
+        }
+        WritableUtils.writeVInt(output, coveredColumnsInfo.size());
+        for (Pair<String, String> colInfo : coveredColumnsInfo) {
+            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
+            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
+        }
+        WritableUtils.writeVInt(output, storeColsInSingleCell ? 1 : -1);
     }
 
     public int getEstimatedByteSize() {
@@ -1241,16 +1423,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
-        indexQualifiers = 
Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
-            indexQualifiers.add(new 
ImmutableBytesPtr(IndexUtil.getIndexColumnName(
-                ref.getFamily(), ref.getQualifier())));
-        }
-
+        byte[] emptyKvQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        dataEmptyKeyValueRef = new 
ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier);
         this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumns.size());
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
@@ -1258,7 +1433,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                KeyValueExpressionVisitor visitor = new 
KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                       if (indexedColumns.add(new 
ColumnReference(expression.getColumnFamily(), 
expression.getColumnQualifier()))) {
                                
indexedColumnTypes.add(expression.getDataType());
                        }
                     return null;
@@ -1523,4 +1698,26 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return emptyKeyValueQualifierPtr.copyBytes();
+    }
+    
+    public Set<Pair<String, String>> getCoveredColumnInfo() {
+        return coveredColumnsInfo;
+    }
+    
+    public Set<Pair<String, String>> getIndexedColumnInfo() {
+        return indexedColumnsInfo;
+    }
+    
+    public StorageScheme getIndexStorageScheme() {
+        if (storeColsInSingleCell) {
+            return StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL;
+        }
+        if (usesEncodedColumnNames) {
+            return StorageScheme.ENCODED_COLUMN_NAMES;
+        }
+        return StorageScheme.NON_ENCODED_COLUMN_NAMES;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index bf1d0fb..05211c0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -166,7 +166,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
             ExpressionVisitor<Void> visitor = new 
StatelessTraverseAllExpressionVisitor<Void>() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                    get.addColumn(expression.getColumnFamily(), 
expression.getColumnName());
+                    get.addColumn(expression.getColumnFamily(), 
expression.getColumnQualifier());
                     estimatedSizeHolder[0]++;
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index eb73d6b..d382005 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -158,12 +158,13 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
                     
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
env.getConfiguration()));
             // Mimic the Put that gets generated by the client on an update of 
the index state
             Put put = new Put(indexTableKey);
-            if (blockWriteRebuildIndex) 
+            if (blockWriteRebuildIndex) {
                 put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
                         PIndexState.ACTIVE.getSerializedBytes());
-            else  
+            } else {  
                 put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
                         PIndexState.DISABLE.getSerializedBytes());
+            }
             put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                 PLong.INSTANCE.toBytes(minTimeStamp));
             final List<Mutation> tableMetadata = 
Collections.<Mutation>singletonList(put);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index c67da6e..9ee5ea7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -67,7 +67,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                /*
+                 * Indexes inherit the storage scheme of the data table which 
means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that 
this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a 
different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = 
indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
+                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
@@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             Map<ImmutableBytesPtr, MultiMutation> 
mutationsToFindPreviousValue) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), 
indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
@@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             // to generate point delete markers for all index rows that were 
added. We don't have Tephra
             // manage index rows in change sets because we don't want to be 
hit with the additional
             // memory hit and do not need to do conflict detection on index 
rows.
-            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time 
batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c0f9707..cdd9cbc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,18 @@
  */
 package org.apache.phoenix.iterate;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static 
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
+import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static 
org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
+import static org.apache.phoenix.util.ScanUtil.setQualifierRanges;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -78,20 +84,25 @@ import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PrefixByteCodec;
 import org.apache.phoenix.util.PrefixByteDecoder;
@@ -159,7 +170,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit, 
Integer offset, Scan scan) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, 
Integer offset, Scan scan) throws SQLException {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -210,7 +221,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             // Project empty key value unless the column 
family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || 
familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, 
QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -228,7 +239,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             if(offset!=null){
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
-
             int cols = plan.getGroupBy().getOrderPreservingColumnCount();
             if (cols > 0 && keyOnlyFilter &&
                 
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -243,13 +253,86 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     ScanUtil.andFilterAtEnd(scan, new 
PageFilter(plan.getLimit()));
                 }
             }
-
+            // When analyzing the table, there is no look up for key values 
being done.
+            // So there is no point setting the range.
+            if (setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) {
+                Pair<Integer, Integer> range = getEncodedQualifierRange(scan, 
context);
+                if (range != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, 
getEncodedColumnQualifier(range.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, 
getEncodedColumnQualifier(range.getSecond()));
+                }
+            }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
             }
         }
     }
+    
+    private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, 
StatementContext context)
+            throws SQLException {
+        PTable table = context.getCurrentTable().getTable();
+        StorageScheme storageScheme = table.getStorageScheme();
+        checkArgument(storageScheme == StorageScheme.ENCODED_COLUMN_NAMES,
+            "Method should only be used for tables using encoded column 
names");
+        Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
+        for (Pair<byte[], byte[]> whereCol : 
context.getWhereConditionColumns()) {
+            byte[] cq = whereCol.getSecond();
+            if (cq != null) {
+                int qualifier = getEncodedColumnQualifier(cq);
+                determineQualifierRange(qualifier, minMaxQualifiers);
+            }
+        }
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+
+        Map<String, Pair<Integer, Integer>> qualifierRanges = 
SchemaUtil.getQualifierRanges(table);
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) 
{
+            if (entry.getValue() != null) {
+                for (byte[] cq : entry.getValue()) {
+                    if (cq != null) {
+                        int qualifier = getEncodedColumnQualifier(cq);
+                        determineQualifierRange(qualifier, minMaxQualifiers);
+                    }
+                }
+            } else {
+                /*
+                 * All the columns of the column family are being projected. 
So we will need to
+                 * consider all the columns in the column family to determine 
the min-max range.
+                 */
+                String family = Bytes.toString(entry.getKey());
+                if (table.getType() == INDEX && table.getIndexType() == LOCAL 
&& !IndexUtil.isLocalIndexFamily(family)) {
+                    //TODO: samarth confirm with James why do we need this 
hack here :(
+                    family = IndexUtil.getLocalIndexColumnFamily(family);
+                }
+                Pair<Integer, Integer> range = qualifierRanges.get(family);
+                determineQualifierRange(range.getFirst(), minMaxQualifiers);
+                determineQualifierRange(range.getSecond(), minMaxQualifiers);
+            }
+        }
+        if (minMaxQualifiers.getFirst() == null) {
+            return null;
+        }
+        return minMaxQualifiers;
+    }
 
+    /**
+     * 
+     * @param cq
+     * @param minMaxQualifiers
+     * @return true if the empty column was projected
+     */
+    private static void determineQualifierRange(Integer qualifier, 
Pair<Integer, Integer> minMaxQualifiers) {
+        if (minMaxQualifiers.getFirst() == null) {
+            minMaxQualifiers.setFirst(qualifier);
+            minMaxQualifiers.setSecond(qualifier);
+        } else {
+            if (minMaxQualifiers.getFirst() > qualifier) {
+                minMaxQualifiers.setFirst(qualifier);
+            } else if (minMaxQualifiers.getSecond() < qualifier) {
+                minMaxQualifiers.setSecond(qualifier);
+            }
+        }
+    }
+    
     private static void optimizeProjection(StatementContext context, Scan 
scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         // columnsTracker contain cf -> qualifiers which should get returned.
@@ -346,7 +429,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new 
ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, 
EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme())));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements 
PeekingResultIterator {
         };
     }
     
-    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
     private Tuple next = UNINITIALIZED;
     
     abstract protected Tuple advance() throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends 
AbstractQueue<T> {
             return this.index;
         }
         
+        @Override
         public int size() {
             if (flushBuffer)
                 return flushedCount;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 8dcb2e8..e4c52c0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
             }
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
-            throw new SQLException("", e);
+            ServerUtil.createIOException(e.getMessage(), e);
         } finally {
             delegate.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..816b78c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,24 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private final boolean useQualifierAsIndex;
     
-    public RegionScannerResultIterator(RegionScanner scanner) {
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers) {
         this.scanner = scanner;
+        this.useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(minMaxQualifiers);
+        this.minMaxQualifiers = minMaxQualifiers;
     }
     
     @Override
@@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value 
of s.next is false
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned
@@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
                 }
                 // We instantiate a new tuple because in all cases currently 
we hang on to it
                 // (i.e. to compute and hold onto the TopN).
-                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                Tuple tuple = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 tuple.setKeyValues(results);
                 return tuple;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index b4f2adc..bc81447 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = 
Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = 
Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+    
     public static final String TABLE_FAMILY = 
QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 
@@ -320,6 +320,13 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = 
Bytes.toBytes(STORAGE_SCHEME);
+    public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = 
Bytes.toBytes(ENCODED_COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+    public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = 
Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
@@ -593,9 +600,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
                 newCells.addAll(cells);
                 newCells.add(kv);
                 Collections.sort(newCells, KeyValue.COMPARATOR);
-                resultTuple.setResult(Result.create(newCells));
+                tuple = new ResultTuple(Result.create(newCells));
             }
-
             return tuple;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
     private final static String STRING_FALSE = "0";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
-    private final static Tuple BEFORE_FIRST = new ResultTuple();
+    private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
 
     private final ResultIterator scanner;
     private final RowProjector rowProjector;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 908a117..2d7550a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory 
{
                     int resultSize = (int)Bytes.readVLong(hashCacheByteArray, 
offset);
                     offset += 
WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
                     ImmutableBytesWritable value = new 
ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+                    //TODO: samarth make joins work with position look up.
                     Tuple result = new ResultTuple(ResultUtil.toResult(value));
                     ImmutableBytesPtr key = 
TupleUtil.getConcatenatedValue(result, onExpressions);
                     List<Tuple> tuples = hashCacheMap.get(key);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index b12326a..a6a57c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     not care about it
      */
     private void initColumnIndexes() throws SQLException {
-        columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+        columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         int columnIndex = 0;
         for(int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
@@ -216,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null)  // Skip PK column
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+                    cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+                } else {
+                    // TODO: samarth verify if this is the right thing to do 
here.
+                    cq = c.getName().getBytes();
+                }
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES,
-                    QueryConstants.EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = 
EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
         }
@@ -243,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
     private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), 
cell.getFamilyOffset(),
                 cell.getFamilyLength());
-        byte[] name = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+        byte[] cq = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }

Reply via email to