Repository: phoenix
Updated Branches:
  refs/heads/encodecolumns2 a4b58c070 -> 42d049275


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 3c45295..7b0451a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -371,7 +371,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         IndexMaintainer.serialize(dataTable, ptr, indexes, 
context.getConnection());
-        scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+        scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
         if (dataTable.isTransactional()) {
             scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
context.getConnection().getMutationState().encodeTransaction());
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a030150..4da26c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1191,7 +1191,7 @@ public class MutationState implements SQLCloseable {
             }
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
-                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
                 if (txState.length > 0) {
                     mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 99b6bf8..d3a3ca4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -17,7 +17,10 @@
  */
 package org.apache.phoenix.index;
 
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -29,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
@@ -52,6 +57,8 @@ import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ColumnInfo;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -116,10 +123,10 @@ import com.google.common.collect.Sets;
  * row and caches any covered columns. Client-side serializes into byte array 
using 
  * @link #serialize(PTable, ImmutableBytesWritable)}
  * and transmits to server-side through either the 
- * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_MD}
+ * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD}
  * Mutation attribute or as a separate RPC call using 
  * {@link org.apache.phoenix.cache.ServerCacheClient})
- *
+ * 
  * 
  * @since 2.1.0
  */
@@ -127,8 +134,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
 
     private static final int EXPRESSION_NOT_PRESENT = -1;
     private static final int ESTIMATED_EXPRESSION_SIZE = 8;
-
-       public static IndexMaintainer create(PTable dataTable, PTable index, 
PhoenixConnection connection) {
+    
+    public static IndexMaintainer create(PTable dataTable, PTable index, 
PhoenixConnection connection) {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != 
PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
         }
@@ -190,14 +197,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
         }
         int nIndexes = 0;
-        int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() 
+ 2;
         while (indexesItr.hasNext()) {
             nIndexes++;
-            PTable index = indexesItr.next();
-            estimatedSize += index.getIndexMaintainer(dataTable, 
connection).getEstimatedByteSize();
+            indexesItr.next();
         }
-        TrustedByteArrayOutputStream stream = new 
TrustedByteArrayOutputStream(estimatedSize + 1);
-        DataOutput output = new DataOutputStream(stream);
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
         try {
             // Encode data table salting in sign of number of indexes
             WritableUtils.writeVInt(output, nIndexes * 
(dataTable.getBucketNum() == null ? 1 : -1));
@@ -207,15 +212,23 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                     dataTable.isImmutableRows() ? 
enabledLocalIndexIterator(indexes.iterator())
                             : nonDisabledIndexIterator(indexes.iterator());
             while (indexesItr.hasNext()) {
-                    indexesItr.next().getIndexMaintainer(dataTable, 
connection).write(output);
+                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, 
connection));
+                    byte[] protoBytes = proto.toByteArray();
+                    WritableUtils.writeVInt(output, protoBytes.length);
+                    output.write(protoBytes);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
-        ptr.set(stream.getBuffer(), 0, stream.size());
+        ptr.set(stream.toByteArray(), 0, stream.size());
     }
     
-
+    /**
+     * For client-side to append serialized IndexMaintainers of keyValueIndexes
+     * @param dataTable data table
+     * @param indexMetaDataPtr bytes pointer to hold returned serialized value
+     * @param keyValueIndexes indexes to serialize
+     */
     public static void serializeAdditional(PTable table, 
ImmutableBytesWritable indexMetaDataPtr,
             List<PTable> keyValueIndexes, PhoenixConnection connection) {
         int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : 
ByteUtil.vintFromBytes(indexMetaDataPtr);
@@ -241,7 +254,10 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
             // Serialize mutable indexes afterwards
             for (PTable index : keyValueIndexes) {
-                index.getIndexMaintainer(table, connection).write(output);
+                IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
+                byte[] protoBytes = 
IndexMaintainer.toProto(maintainer).toByteArray();
+                WritableUtils.writeVInt(output, protoBytes.length);
+                output.write(protoBytes);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
@@ -250,15 +266,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
     
     public static List<IndexMaintainer> deserialize(ImmutableBytesWritable 
metaDataPtr,
-            KeyValueBuilder builder) {
-        return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), 
metaDataPtr.getLength());
+            KeyValueBuilder builder, boolean useProtoForIndexMaintainer) {
+        return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), 
metaDataPtr.getLength(), useProtoForIndexMaintainer);
     }
     
-    public static List<IndexMaintainer> deserialize(byte[] buf) {
-        return deserialize(buf, 0, buf.length);
+    public static List<IndexMaintainer> deserialize(byte[] buf, boolean 
useProtoForIndexMaintainer) {
+        return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer);
     }
 
-    public static List<IndexMaintainer> deserialize(byte[] buf, int offset, 
int length) {
+    private static List<IndexMaintainer> deserialize(byte[] buf, int offset, 
int length, boolean useProtoForIndexMaintainer) {
         ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, 
length);
         DataInput input = new DataInputStream(stream);
         List<IndexMaintainer> maintainers = Collections.emptyList();
@@ -270,31 +286,31 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             rowKeySchema.readFields(input);
             maintainers = Lists.newArrayListWithExpectedSize(size);
             for (int i = 0; i < size; i++) {
-                IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, 
isDataTableSalted);
-                maintainer.readFields(input);
-                maintainers.add(maintainer);
+                if (useProtoForIndexMaintainer) {
+                  int protoSize = WritableUtils.readVInt(input);
+                  byte[] b = new byte[protoSize];
+                  input.readFully(b);
+                  
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                  maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
+                } else {
+                    IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
+                    maintainer.readFields(input);
+                    maintainers.add(maintainer);
+                }
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
         return maintainers;
     }
-
+    
     private byte[] viewIndexId;
     private boolean isMultiTenant;
     // indexed expressions that are not present in the row key of the data 
table, the expression can also refer to a regular column
     private List<Expression> indexedExpressions;
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
-    private Set<ColumnReference> coveredColumns;
-    // Information for columns of data tables that are being indexed. The 
first part of the pair is column family and second part is the column name. 
-    private Set<Pair<String, String>> indexedColumnsInfo;
-    // Information for columns of data tables that are being covered by the 
index. The first part of the pair is column family and second part is the 
column name.
-    private Set<Pair<String, String>> coveredColumnsInfo;
-    // Map of covered columns where a key is column reference for a column in 
the data table
-    // and value is column reference for corresponding column in the index 
table.
-    // TODO: samarth confirm that we don't need a separate map for tracking 
column families of local indexes.
-    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+    
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -308,7 +324,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private boolean indexWALDisabled;
     private boolean isLocalIndex;
     private boolean immutableRows;
-
     // Transient state
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
@@ -319,15 +334,30 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
-    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
+    
+    /**** START: New member variables added in 4.10 *****/ 
     private QualifierEncodingScheme encodingScheme;
     private ImmutableStorageScheme immutableStorageScheme;
-    
+    /*
+     * Information for columns of data tables that are being indexed. The 
first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this 
state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column 
for which we need to generate an index
+     * table put/delete is different from the columns that are indexed in the 
phoenix schema. This information helps us
+     * determine whether or not certain operations like DROP COLUMN should 
impact the index.
+     */
+    private Set<Pair<String, String>> indexedColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in 
the data table
+     * and value is column reference for corresponding column in the index 
table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+    /**** END: New member variables added in 4.10 *****/
+
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean 
isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
-
+    
     private IndexMaintainer(final PTable dataTable, final PTable index, 
PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() 
== PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
@@ -336,7 +366,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
         this.encodingScheme = index.getEncodingScheme();
-        this.immutableStorageScheme = index.getImmutableStorageScheme();
+        
+        // null check for b/w compatibility
+        this.encodingScheme = index.getEncodingScheme() == null ? 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
+        this.immutableStorageScheme = index.getImmutableStorageScheme() == 
null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : 
index.getImmutableStorageScheme();
+        
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
@@ -385,7 +419,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.indexTableName = indexTableName;
         this.indexedColumnTypes = 
Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = 
Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
-        this.coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
         this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns 
- nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : 
nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
@@ -417,7 +450,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         StatementContext context = new StatementContext(new 
PhoenixStatement(connection), resolver);
         this.indexedColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
-        this.coveredColumnsInfo = 
Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
         
         IndexExpressionCompiler expressionIndexCompiler = new 
IndexExpressionCompiler(context);
         for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
@@ -508,10 +540,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 PColumn dataColumn = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
                 byte[] dataColumnCq = dataColumn.getColumnQualifierBytes();
                 byte[] indexColumnCq = indexColumn.getColumnQualifierBytes();
-                this.coveredColumns.add(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq));
                 this.coveredColumnsMap.put(new 
ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
                         new 
ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
-                this.coveredColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName().getString(), 
dataColumn.getName().getString()));
             }
         }
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
@@ -918,27 +948,27 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         return indexRowKeySchema;
     }
-
+    
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter 
valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] 
regionStartKey, byte[] regionEndKey) throws IOException {
-       byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
-       Put put = null;
+        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, 
regionStartKey, regionEndKey);
+        Put put = null;
         // New row being inserted: add the empty key value
         if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
+                this.getEmptyKeyValueFamily(), 
dataEmptyKeyValueRef.getQualifierWritable(), ts,
                 // set the value to the empty column name
-                emptyKeyValueQualifierPtr));
+                dataEmptyKeyValueRef.getQualifierWritable()));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
         }
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
-        if (immutableStorageScheme != 
immutableStorageScheme.ONE_CELL_PER_COLUMN) {
+        if (immutableStorageScheme != 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
             // map from index column family to list of pair of index column 
and data column (for covered columns)
             Map<ImmutableBytesPtr, List<Pair<ColumnReference, 
ColumnReference>>> familyToColListMap = Maps.newHashMap();
             for (ColumnReference ref : this.getCoveredColumns()) {
-               ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
-               ImmutableBytesPtr cf = new 
ImmutableBytesPtr(indexColRef.getFamily());
+                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ImmutableBytesPtr cf = new 
ImmutableBytesPtr(indexColRef.getFamily());
                 if (!familyToColListMap.containsKey(cf)) {
                     familyToColListMap.put(cf, Lists.<Pair<ColumnReference, 
ColumnReference>>newArrayList());
                 }
@@ -956,39 +986,39 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 Expression[] colValues = 
EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
                 // set the values of the columns
                 for (Pair<ColumnReference, ColumnReference> colRefPair : 
colRefPairs) {
-                       ColumnReference indexColRef = colRefPair.getFirst();
-                       ColumnReference dataColRef = colRefPair.getSecond();
-                       Expression expression = new 
SingleCellColumnExpression(new PDatum() {
-                                               @Override
-                                               public boolean isNullable() {
-                                                       return false;
-                                               }
-                                               
-                                               @Override
-                                               public SortOrder getSortOrder() 
{
-                                                       return null;
-                                               }
-                                               
-                                               @Override
-                                               public Integer getScale() {
-                                                       return null;
-                                               }
-                                               
-                                               @Override
-                                               public Integer getMaxLength() {
-                                                       return null;
-                                               }
-                                               
-                                               @Override
-                                               public PDataType getDataType() {
-                                                       return null;
-                                               }
-                                       }, dataColRef.getFamily(), 
dataColRef.getQualifier(), encodingScheme);
-                       ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                    ColumnReference indexColRef = colRefPair.getFirst();
+                    ColumnReference dataColRef = colRefPair.getSecond();
+                    Expression expression = new SingleCellColumnExpression(new 
PDatum() {
+                        @Override
+                        public boolean isNullable() {
+                            return false;
+                        }
+                        
+                        @Override
+                        public SortOrder getSortOrder() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public Integer getScale() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public Integer getMaxLength() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public PDataType getDataType() {
+                            return null;
+                        }
+                    }, dataColRef.getFamily(), dataColRef.getQualifier(), 
encodingScheme);
+                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
                     expression.evaluate(new ValueGetterTuple(valueGetter), 
ptr);
                     byte[] value = ptr.copyBytesIfNecessary();
-                                       if (value != null) {
-                                               int indexArrayPos = 
encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                    if (value != null) {
+                        int indexArrayPos = 
encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
                         colValues[indexArrayPos] = new 
LiteralExpression(value);
                     }
                 }
@@ -1006,20 +1036,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                 //this is a little bit of extra work for installations that 
are running <0.94.14, but that should be rare and is a short-term set of 
wrappers - it shouldn't kill GC
                 put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
             }
-        }
-        else {
-               for (ColumnReference ref : this.getCoveredColumns()) {
-                //FIXME: samarth figure out a backward compatible way to 
handle this as coveredcolumnsmap won't be availble for older phoenix clients.
+        } else {
+            for (ColumnReference ref : this.getCoveredColumns()) {
                 ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
                 ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
                 ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
                 ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
                 if (value != null) {
-                       if (put == null) {
+                    if (put == null) {
                         put = new Put(indexRowKey);
                         put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                       put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
+                    put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
             }
         }
@@ -1097,7 +1125,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, 
Collections.<KeyValue>emptyList(), ts, null, null);
     }
     
-    @SuppressWarnings("deprecation")
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
@@ -1107,7 +1134,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             Delete delete = new Delete(indexRowKey);
             
             for (ColumnReference ref : getCoveredColumns()) {
-                byte[] family = ref.getFamily();
                 ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete 
should be as well
                 if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -1125,11 +1151,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             return delete;
         }
         Delete delete = null;
+        Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet();
         // Delete columns for missing key values
         for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), 
kv.getQualifier());
-                if (coveredColumns.contains(ref)) {
+                if (dataTableColRefs.contains(ref)) {
                     if (delete == null) {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
@@ -1137,9 +1164,6 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                     ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete 
for index as well
                     if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
-                        //FIXME: samarth change this. Index column qualifiers 
are not derivable from data table cqs.
-                        // Figure out a backward compatible way of going this 
since coveredColumnsMap won't be available
-                        // for older clients.
                         delete.deleteColumn(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
                     } else {
                         delete.deleteColumns(indexColumn.getFamily(), 
indexColumn.getQualifier(), ts);
@@ -1149,13 +1173,13 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         }
         return delete;
     }
-
+    
     public byte[] getIndexTableName() {
         return indexTableName;
     }
     
     public Set<ColumnReference> getCoveredColumns() {
-        return coveredColumns;
+        return coveredColumnsMap.keySet();
     }
 
     public Set<ColumnReference> getAllColumns() {
@@ -1168,7 +1192,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // If if there are no covered columns, we know it's our default name
         return emptyKeyValueCFPtr;
     }
-
+    
+    @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void readFields(DataInput input) throws IOException {
         int encodedIndexSaltBucketsAndMultiTenant = 
WritableUtils.readVInt(input);
@@ -1196,17 +1221,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int encodedCoveredolumnsAndLocalIndex = WritableUtils.readVInt(input);
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
-        coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
         coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
             byte[] dataTableCf = Bytes.readByteArray(input);
             byte[] dataTableCq = Bytes.readByteArray(input);
-            byte[] indexTableCf = Bytes.readByteArray(input);
-            byte[] indexTableCq = Bytes.readByteArray(input);
-            ColumnReference dataColumn = new ColumnReference(dataTableCf, 
dataTableCq); 
-            coveredColumns.add(dataColumn);
-            ColumnReference indexColumn = new ColumnReference(indexTableCf, 
indexTableCq);
-            coveredColumnsMap.put(dataColumn, indexColumn);
+            ColumnReference dataTableRef = new ColumnReference(dataTableCf, 
dataTableCq);
+            byte[] indexTableCf = isLocalIndex ? 
IndexUtil.getLocalIndexColumnFamily(dataTableCf) : dataTableCf;
+            byte[] indexTableCq  = IndexUtil.getIndexColumnName(dataTableCf, 
dataTableCq);
+            ColumnReference indexTableRef = new ColumnReference(indexTableCf, 
indexTableCq);
+            coveredColumnsMap.put(dataTableRef, indexTableRef);
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
@@ -1234,9 +1257,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             int numIndexedExpressions = WritableUtils.readVInt(input);
             indexedExpressions = 
Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
-               Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
-               expression.readFields(input);
-               indexedExpressions.add(expression);
+                Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                expression.readFields(input);
+                indexedExpressions.add(expression);
             }
         }
         else {
@@ -1285,26 +1308,82 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         int encodedEstimatedIndexRowKeyBytesAndImmutableRows = 
WritableUtils.readVInt(input);
         this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows 
< 0;
         this.estimatedIndexRowKeyBytes = 
Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows);
-        int numCols = WritableUtils.readVInt(input);
-        //TODO: samarth figure out a backward compatible way of 
reading/writing indexedColumnsInfo
-        indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
-        for (int i = 1; i <= numCols; i++) {
-            byte[] dataTableCf = Bytes.readByteArray(input);
-            byte[] dataTableCq = Bytes.readByteArray(input);
-            indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        initCachedState();
+    }
+    
+        
+    public static IndexMaintainer 
fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema 
dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+        IndexMaintainer maintainer = new 
IndexMaintainer(dataTableRowKeySchema, isDataTableSalted);
+        maintainer.nIndexSaltBuckets = proto.getSaltBuckets();
+        maintainer.isMultiTenant = proto.getIsMultiTenant();
+        maintainer.viewIndexId = proto.hasViewIndexId() ? 
proto.getViewIndexId().toByteArray() : null;
+        List<ServerCachingProtos.ColumnReference> indexedColumnsList = 
proto.getIndexedColumnsList();
+        maintainer.indexedColumns = new 
HashSet<ColumnReference>(indexedColumnsList.size());
+        for (ServerCachingProtos.ColumnReference colRefFromProto : 
indexedColumnsList) {
+            maintainer.indexedColumns.add(new 
ColumnReference(colRefFromProto.getFamily().toByteArray(), 
colRefFromProto.getQualifier().toByteArray()));
+        }
+        List<Integer> indexedColumnTypes = 
proto.getIndexedColumnTypeOrdinalList();
+        maintainer.indexedColumnTypes = new 
ArrayList<PDataType>(indexedColumnTypes.size());
+        for (Integer typeOrdinal : indexedColumnTypes) {
+            maintainer.indexedColumnTypes.add(PDataType.values()[typeOrdinal]);
+        }
+        maintainer.indexTableName = proto.getIndexTableName().toByteArray();
+        maintainer.rowKeyOrderOptimizable = proto.getRowKeyOrderOptimizable();
+        maintainer.dataEmptyKeyValueCF = 
proto.getDataTableEmptyKeyValueColFamily().toByteArray();
+        ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = 
proto.getEmptyKeyValueColFamily();
+        maintainer.emptyKeyValueCFPtr = new 
ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), 
emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+        maintainer.indexedExpressions = new ArrayList<>();
+        try (ByteArrayInputStream stream = new 
ByteArrayInputStream(proto.getIndexedExpressions().toByteArray())) {
+            DataInput input = new DataInputStream(stream);
+            while (stream.available() > 0) {
+                int expressionOrdinal = WritableUtils.readVInt(input);
+                Expression expression = 
ExpressionType.values()[expressionOrdinal].newInstance();
+                expression.readFields(input);
+                maintainer.indexedExpressions.add(expression);
+            }
         }
-        coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols);
-        int numCoveredCols = WritableUtils.readVInt(input);
-        for (int i = 1; i <= numCoveredCols; i++) {
-            byte[] dataTableCf = Bytes.readByteArray(input);
-            byte[] dataTableCq = Bytes.readByteArray(input);
-            coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), 
Bytes.toString(dataTableCq)));
+        maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, 
dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, 
maintainer.isMultiTenant);
+        try (ByteArrayInputStream stream = new 
ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) {
+            DataInput input = new DataInputStream(stream);
+            maintainer.rowKeyMetaData.readFields(input);   
+        }
+        maintainer.nDataCFs = proto.getNumDataTableColFamilies();
+        maintainer.indexWALDisabled = proto.getIndexWalDisabled();
+        maintainer.estimatedIndexRowKeyBytes = proto.getIndexRowKeyByteSize();
+        maintainer.immutableRows = proto.getImmutable();
+        List<ColumnInfo> indexedColumnInfoList = 
proto.getIndexedColumnInfoList();
+        maintainer.indexedColumnsInfo = Sets.newHashSet();
+        for (ColumnInfo info : indexedColumnInfoList) {
+            maintainer.indexedColumnsInfo.add(new Pair<>(info.getFamilyName(), 
info.getColumnName()));
+        }
+        // proto doesn't support single byte so need an explicit cast here
+        maintainer.encodingScheme = 
PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme());
+        maintainer.immutableStorageScheme = 
PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme());
+        maintainer.isLocalIndex = proto.getIsLocalIndex();
+        
+        List<ServerCachingProtos.ColumnReference> 
dataTableColRefsForCoveredColumnsList = 
proto.getDataTableColRefForCoveredColumnsList();
+        List<ServerCachingProtos.ColumnReference> 
indexTableColRefsForCoveredColumnsList = 
proto.getIndexTableColRefForCoveredColumnsList();
+        maintainer.coveredColumnsMap = 
Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size());
+        boolean encodedColumnNames = maintainer.encodingScheme != 
NON_ENCODED_QUALIFIERS;
+        Iterator<ServerCachingProtos.ColumnReference> indexTableColRefItr = 
indexTableColRefsForCoveredColumnsList.iterator();
+        for (ServerCachingProtos.ColumnReference colRefFromProto : 
dataTableColRefsForCoveredColumnsList) {
+            ColumnReference dataTableColRef = new 
ColumnReference(colRefFromProto.getFamily().toByteArray(), 
colRefFromProto.getQualifier( ).toByteArray());
+            ColumnReference indexTableColRef;
+            if (encodedColumnNames) {
+                ServerCachingProtos.ColumnReference fromProto = 
indexTableColRefItr.next(); 
+                indexTableColRef = new 
ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( 
).toByteArray());
+            } else {
+                byte[] cq = 
IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), 
dataTableColRef.getQualifier());
+                byte[] cf = maintainer.isLocalIndex ? 
IndexUtil.getLocalIndexColumnFamily(dataTableColRef.getFamily()) : 
dataTableColRef.getFamily();
+                indexTableColRef = new ColumnReference(cf, cq);
+            }
+            maintainer.coveredColumnsMap.put(dataTableColRef, 
indexTableColRef);
         }
-        encodingScheme = WritableUtils.readEnum(input, 
QualifierEncodingScheme.class);
-        immutableStorageScheme = WritableUtils.readEnum(input, 
ImmutableStorageScheme.class);
-        initCachedState();
+        maintainer.initCachedState();
+        return maintainer;
     }
     
+    @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void write(DataOutput output) throws IOException {
         // Encode nIndexSaltBuckets and isMultiTenant together
@@ -1324,14 +1403,10 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             WritableUtils.writeVInt(output, type.ordinal());
         }
         // Encode coveredColumns.size() and whether or not this is a local 
index
-        WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * 
(isLocalIndex ? -1 : 1));
-        for (Entry<ColumnReference, ColumnReference> ref : 
coveredColumnsMap.entrySet()) {
-            ColumnReference dataColumn = ref.getKey();
-            ColumnReference indexColumn = ref.getValue();
-            Bytes.writeByteArray(output, dataColumn.getFamily());
-            Bytes.writeByteArray(output, dataColumn.getQualifier());
-            Bytes.writeByteArray(output, indexColumn.getFamily());
-            Bytes.writeByteArray(output, indexColumn.getQualifier());
+        WritableUtils.writeVInt(output, (coveredColumnsMap.size() + 1) * 
(isLocalIndex ? -1 : 1));
+        for (ColumnReference ref : coveredColumnsMap.keySet()) {
+            Bytes.writeByteArray(output, ref.getFamily());
+            Bytes.writeByteArray(output, ref.getQualifier());
         }
         // TODO: remove when rowKeyOrderOptimizable hack no longer needed
         WritableUtils.writeVInt(output,indexTableName.length * 
(rowKeyOrderOptimizable ? 1 : -1));
@@ -1341,10 +1416,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // when indexedColumnTypes is removed, remove this 
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), 
emptyKeyValueCFPtr.getLength());
+        
         WritableUtils.writeVInt(output, indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
-               WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
-               expression.write(output);
+            WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
+            expression.write(output);
         }
         
         rowKeyMetaData.write(output);
@@ -1352,18 +1428,76 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? 
-1 : 1));
         // Encode estimatedIndexRowKeyBytes and immutableRows together.
         WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * 
(immutableRows ? -1 : 1));
-        WritableUtils.writeVInt(output, indexedColumnsInfo.size());
-        for (Pair<String, String> colInfo : indexedColumnsInfo) {
-            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
-            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
-        }
-        WritableUtils.writeVInt(output, coveredColumnsInfo.size());
-        for (Pair<String, String> colInfo : coveredColumnsInfo) {
-            Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : 
colInfo.getFirst().getBytes());
-            Bytes.writeByteArray(output, colInfo.getSecond().getBytes());
-        }
-        WritableUtils.writeEnum(output, encodingScheme);
-        WritableUtils.writeEnum(output, immutableStorageScheme);
+    }
+    
+    public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer 
maintainer) throws IOException {
+        ServerCachingProtos.IndexMaintainer.Builder builder = 
ServerCachingProtos.IndexMaintainer.newBuilder();
+        builder.setSaltBuckets(maintainer.nIndexSaltBuckets);
+        builder.setIsMultiTenant(maintainer.isMultiTenant);
+        if (maintainer.viewIndexId != null) {
+            builder.setViewIndexId(ByteStringer.wrap(maintainer.viewIndexId));
+        }
+        for (ColumnReference colRef : maintainer.indexedColumns) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder =  
ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+            builder.addIndexedColumns(cRefBuilder.build());
+        }
+        for (PDataType dataType : maintainer.indexedColumnTypes) {
+            builder.addIndexedColumnTypeOrdinal(dataType.ordinal());
+        }
+        for (Entry<ColumnReference, ColumnReference> e : 
maintainer.coveredColumnsMap.entrySet()) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder =  
ServerCachingProtos.ColumnReference.newBuilder();
+            ColumnReference dataTableColRef = e.getKey();
+            
cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+            
cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+            builder.addDataTableColRefForCoveredColumns(cRefBuilder.build());
+            if (maintainer.encodingScheme != NON_ENCODED_QUALIFIERS) {
+                // We need to serialize the colRefs of index tables only in 
case of encoded column names.
+                ColumnReference indexTableColRef = e.getValue();
+                cRefBuilder =  
ServerCachingProtos.ColumnReference.newBuilder();
+                
cRefBuilder.setFamily(ByteStringer.wrap(indexTableColRef.getFamily()));
+                
cRefBuilder.setQualifier(ByteStringer.wrap(indexTableColRef.getQualifier()));
+                
builder.addIndexTableColRefForCoveredColumns(cRefBuilder.build());
+            }
+        }
+        builder.setIsLocalIndex(maintainer.isLocalIndex);
+        
builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName));
+        builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable);
+        
builder.setDataTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.dataEmptyKeyValueCF));
+        ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = 
ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+        
ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+        ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+        ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+        builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            for (Expression expression : maintainer.indexedExpressions) {
+                WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            
builder.setIndexedExpressions(ByteStringer.wrap(stream.toByteArray()));
+        }
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            maintainer.rowKeyMetaData.write(output);
+            builder.setRowKeyMetadata(ByteStringer.wrap(stream.toByteArray()));
+        }
+        builder.setNumDataTableColFamilies(maintainer.nDataCFs);
+        builder.setIndexWalDisabled(maintainer.indexWALDisabled);
+        builder.setIndexRowKeyByteSize(maintainer.estimatedIndexRowKeyBytes);
+        builder.setImmutable(maintainer.immutableRows);
+        for (Pair<String, String> p : maintainer.indexedColumnsInfo) {
+            ServerCachingProtos.ColumnInfo.Builder ciBuilder = 
ServerCachingProtos.ColumnInfo.newBuilder();
+            if (p.getFirst() != null) {
+                ciBuilder.setFamilyName(p.getFirst());
+            }
+            ciBuilder.setColumnName(p.getSecond());
+            builder.addIndexedColumnInfo(ciBuilder.build());
+        }
+        
builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue());
+        
builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue());
+        return builder.build();
     }
 
     public int getEstimatedByteSize() {
@@ -1381,8 +1515,9 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             PDataType type = indexedColumnTypes.get(i);
             size += WritableUtils.getVIntSize(type.ordinal());
         }
-        size += WritableUtils.getVIntSize(coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
+        Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet();
+        size += WritableUtils.getVIntSize(dataTableColRefs.size());
+        for (ColumnReference ref : dataTableColRefs) {
             size += 
WritableUtils.getVIntSize(ref.getFamilyWritable().getSize());
             size += ref.getFamily().length;
             size += 
WritableUtils.getVIntSize(ref.getQualifierWritable().getSize());
@@ -1412,8 +1547,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     private void initCachedState() {
         byte[] emptyKvQualifier = 
EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
         dataEmptyKeyValueRef = new 
ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
-        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier);
-        this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumns.size());
+        this.allColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + 
coveredColumnsMap.size());
         // columns that are required to evaluate all expressions in 
indexedExpressions (not including columns in data row key)
         this.indexedColumns = 
Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
@@ -1429,7 +1563,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             expression.accept(visitor);
         }
         allColumns.addAll(indexedColumns);
-        allColumns.addAll(coveredColumns);
+        allColumns.addAll(coveredColumnsMap.keySet());
         
         int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 
0);
         int nIndexPkColumns = getIndexPkColumnCount();
@@ -1473,12 +1607,21 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
 
     private int getIndexPkColumnCount() {
-        return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - 
(isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
+        return getIndexPkColumnCount(dataRowKeySchema, 
indexedExpressions.size(), isDataTableSalted, isMultiTenant);
+    }
+    
+    private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int 
numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) {
+        return rowKeySchema.getFieldCount() + numIndexExpressions - 
(isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
     }
     
     private RowKeyMetaData newRowKeyMetaData() {
         return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : 
new IntSizedRowKeyMetaData();
     }
+    
+    private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, 
RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, 
boolean isMultiTenant) {
+        int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, 
numIndexExpressions, isDataTableSalted, isMultiTenant);
+        return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : 
i.new IntSizedRowKeyMetaData();
+    }
 
     private RowKeyMetaData newRowKeyMetaData(int capacity) {
         return capacity < 0xFF ? new ByteSizeRowKeyMetaData(capacity) : new 
IntSizedRowKeyMetaData(capacity);
@@ -1687,11 +1830,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
     
     public byte[] getEmptyKeyValueQualifier() {
-        return emptyKeyValueQualifierPtr.copyBytes();
-    }
-    
-    public Set<Pair<String, String>> getCoveredColumnInfo() {
-        return coveredColumnsInfo;
+        return dataEmptyKeyValueRef.getQualifier();
     }
     
     public Set<Pair<String, String>> getIndexedColumnInfo() {
@@ -1701,4 +1840,5 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     public ImmutableStorageScheme getIndexStorageScheme() {
         return immutableStorageScheme;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 05a01b9..fcabdfd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -93,4 +93,5 @@ public class IndexMetaDataCacheClient {
          */
         return serverCache.addServerCache(ranges, ptr, txState, new 
IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 56849fe..9edcafc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -47,10 +47,10 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk) throws SQLException {
+    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws 
SQLException {
         // just use the standard keyvalue builder - this doesn't really need 
to be fast
         final List<IndexMaintainer> maintainers = 
-                IndexMaintainer.deserialize(cachePtr, 
GenericKeyValueBuilder.INSTANCE);
+                IndexMaintainer.deserialize(cachePtr, 
GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
         final Transaction txn;
         try {
             txn = txState.length!=0 ? MutationState.decodeTransaction(txState) 
: null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 9d2955b..4116101 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
+    public static final String INDEX_PROTO_MD = "IdxProtoMD";
     public static final String INDEX_UUID = "IdxUUID";
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
     private static KeyValueBuilder KV_BUILDER = 
GenericKeyValueBuilder.INSTANCE;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index d22e957..39473dc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -47,10 +47,15 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         if (attributes == null) { return 
IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
         byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
         if (uuid == null) { return 
IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
-        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        boolean useProto = false;
+        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
+        useProto = md != null;
+        if (md == null) {
+            md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        }
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
-            final List<IndexMaintainer> indexMaintainers = 
IndexMaintainer.deserialize(md);
+            final List<IndexMaintainer> indexMaintainers = 
IndexMaintainer.deserialize(md, useProto);
             final Transaction txn = MutationState.decodeTransaction(txState);
             return new IndexMetaDataCache() {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 2d7550a..1985c2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -69,7 +69,7 @@ public class HashCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, 
MemoryChunk chunk) throws SQLException {
+    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, 
MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
         try {
             // This reads the uncompressed length from the front of the 
compressed input
             int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), 
cachePtr.getOffset());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 81c3e75..1de57cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -156,6 +156,7 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -2178,7 +2179,7 @@ public class MetaDataClient {
                 Integer encodedCQ =  isPkColumn ? null : 
cqCounter.getNextQualifier(cqCounterFamily);
                 byte[] columnQualifierBytes = null;
                 try {
-                    columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme);
+                    columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), 
encodedCQ, encodingScheme, isPkColumn);
                 }
                 catch (QualifierOutOfRangeException e) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
@@ -3268,7 +3269,7 @@ public class MetaDataClient {
                             }
                             byte[] columnQualifierBytes = null;
                             try {
-                                columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table);
+                                columnQualifierBytes = 
EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(),
 encodedCQ, table, colDef.isPK());
                             }
                             catch (QualifierOutOfRangeException e) {
                                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED)
@@ -3604,9 +3605,11 @@ public class MetaDataClient {
                     // get the covered columns 
                     List<PColumn> indexColumnsToDrop = 
Lists.newArrayListWithExpectedSize(columnRefs.size());
                     Set<Pair<String, String>> indexedColsInfo = 
indexMaintainer.getIndexedColumnInfo();
-                    Set<Pair<String, String>> coveredColsInfo = 
indexMaintainer.getCoveredColumnInfo();
+                    Set<ColumnReference> coveredCols = 
indexMaintainer.getCoveredColumns();
                     for(PColumn columnToDrop : tableColumnsToDrop) {
                         Pair<String, String> columnToDropInfo = new 
Pair<>(columnToDrop.getFamilyName().getString(), 
columnToDrop.getName().getString());
+                        ColumnReference colDropRef = new 
ColumnReference(columnToDrop.getFamilyName() == null ? null
+                                : columnToDrop.getFamilyName().getBytes(), 
columnToDrop.getColumnQualifierBytes());
                         boolean isColumnIndexed = 
indexedColsInfo.contains(columnToDropInfo);
                         if (isColumnIndexed) {
                             if (index.getViewIndexId() == null) { 
@@ -3614,8 +3617,7 @@ public class MetaDataClient {
                             }
                             connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, index.getName().getString()), 
index.getParentName() == null ? null : index.getParentName().getString(), 
index.getTimeStamp());
                             removedIndexTableOrColumn = true;
-                        }
-                        else if (coveredColsInfo.contains(columnToDropInfo)) {
+                        } else if (coveredCols.contains(colDropRef)) {
                             String indexColumnName = 
IndexUtil.getIndexColumnName(columnToDrop);
                             PColumn indexColumn = 
index.getPColumnForColumnName(indexColumnName);
                             indexColumnsToDrop.add(indexColumn);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index 16054b4..78baa4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -210,6 +211,10 @@ public class PColumnImpl implements PColumn {
     
     @Override
     public byte[] getColumnQualifierBytes() {
+        // Needed for backward compatibility
+        if (!SchemaUtil.isPKColumn(this) && columnQualifierBytes == null) {
+            return this.name.getBytes();
+        }
         return columnQualifierBytes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index e99003f..5a5b355 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -361,6 +361,11 @@ public class EncodedColumnQualiferCellsList implements 
List<Cell> {
         return getCellForColumnQualifier(columnQualifier);
     }
     
+    public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, 
int length) {
+        int columnQualifier = encodingScheme.decode(qualifierBytes, offset, 
length);
+        return getCellForColumnQualifier(columnQualifier);
+    }
+    
     private Cell getCellForColumnQualifier(int columnQualifier) {
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index ce2c98c..725161a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -66,11 +66,13 @@ public class EncodedColumnsUtil {
     }
     
     public static QualifierEncodingScheme getQualifierEncodingScheme(Scan s) {
-        return 
QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]);
+        // null check for backward compatibility
+        return 
s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME) == null ? 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : 
QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]);
     }
     
     public static ImmutableStorageScheme getImmutableStorageScheme(Scan s) {
-        return 
ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]);
+        // null check for backward compatibility
+        return 
s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME) == 
null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : 
ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]);
     }
 
     /**
@@ -157,18 +159,19 @@ public class EncodedColumnsUtil {
         return toReturn;
     }
     
-    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, PTable table) {
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, PTable table, boolean isPk) {
         QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
-        return getColumnQualifierBytes(columnName, numberBasedQualifier, 
encodingScheme);
+        return getColumnQualifierBytes(columnName, numberBasedQualifier, 
encodingScheme, isPk);
     }
     
-    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, QualifierEncodingScheme encodingScheme) {
-        if (encodingScheme == NON_ENCODED_QUALIFIERS) {
+    public static byte[] getColumnQualifierBytes(String columnName, Integer 
numberBasedQualifier, QualifierEncodingScheme encodingScheme, boolean isPk) {
+        if (isPk) {
+            return null;
+        }
+        if (encodingScheme == null || encodingScheme == 
NON_ENCODED_QUALIFIERS) {
             return Bytes.toBytes(columnName);
-        } else if (numberBasedQualifier != null) {
-            return encodingScheme.encode(numberBasedQualifier);
         }
-        return null;
+        return encodingScheme.encode(numberBasedQualifier);
     }
     
     public static Expression[] createColumnExpressionArray(int 
maxEncodedColumnQualifier) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 217c99e..9c7f9ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -181,6 +181,11 @@ public class IndexUtil {
                 : QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + 
dataColumnFamilyName;
     }
     
+    public static byte[] getLocalIndexColumnFamily(byte[] 
dataColumnFamilyBytes) {
+        String dataCF = Bytes.toString(dataColumnFamilyBytes);
+        return getLocalIndexColumnFamily(dataCF).getBytes();
+    }
+    
     public static PColumn getDataColumn(PTable dataTable, String 
indexColumnName) {
         int pos = indexColumnName.indexOf(INDEX_COLUMN_NAME_SEP);
         if (pos < 0) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
index ac2a850..ade5239 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -47,7 +47,7 @@ public class TenantCacheTest {
         TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, 
maxServerCacheTimeToLive);
         ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new 
ImmutableBytesWritable(Bytes.toBytes("a"));
-        newTenantCache.addServerCache(cacheId, cachePtr, 
ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory);
+        newTenantCache.addServerCache(cacheId, cachePtr, 
ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         newTenantCache.removeServerCache(cacheId);
         assertEquals(maxBytes, memoryManager.getAvailableMemory());
@@ -63,7 +63,7 @@ public class TenantCacheTest {
         TenantCacheImpl cache = new TenantCacheImpl(memoryManager, 
maxServerCacheTimeToLive, ticker);
         ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new 
ImmutableBytesWritable(Bytes.toBytes("a"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, 
cacheFactory);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, 
cacheFactory, true);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
         cache.cleanUp();
@@ -91,7 +91,7 @@ public class TenantCacheTest {
         }
 
         @Override
-        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk)
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer)
                 throws SQLException {
             return chunk;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 5887e5b..0d4a52f 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -109,7 +109,7 @@ public class IndexMaintainerTest  extends 
BaseConnectionlessQueryTest {
             PTable index = pconn.getTable(new 
PTableKey(pconn.getTenantId(),fullIndexName));
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             table.getIndexMaintainers(ptr, pconn);
-            List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, 
builder);
+            List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, 
builder, true);
             assertEquals(1,c1.size());
             IndexMaintainer im1 = c1.get(0);
             
@@ -310,7 +310,7 @@ public class IndexMaintainerTest  extends 
BaseConnectionlessQueryTest {
             PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
"FHA"));
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             table.getIndexMaintainers(ptr, pconn);
-            List<IndexMaintainer> indexMaintainerList = 
IndexMaintainer.deserialize(ptr, GenericKeyValueBuilder.INSTANCE);
+            List<IndexMaintainer> indexMaintainerList = 
IndexMaintainer.deserialize(ptr, GenericKeyValueBuilder.INSTANCE, true);
             assertEquals(1,indexMaintainerList.size());
             IndexMaintainer indexMaintainer = indexMaintainerList.get(0);
             Set<ColumnReference> indexedColumns = 
indexMaintainer.getIndexedColumns();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto 
b/phoenix-protocol/src/main/ServerCachingService.proto
index a45b18f..044c111 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -30,12 +30,47 @@ message ImmutableBytesWritable {
   required int32 length = 3;
 }
 
+message ColumnReference {
+  required bytes family = 1;
+  required bytes qualifier = 2;
+}
+
+message ColumnInfo {
+  optional string familyName = 1;
+  required string columnName = 2;
+}
+
+message IndexMaintainer {
+  required int32 saltBuckets = 1;
+  required bool isMultiTenant = 2;
+  optional bytes viewIndexId = 3;
+  repeated ColumnReference indexedColumns = 4;
+  repeated int32 indexedColumnTypeOrdinal = 5;
+  repeated ColumnReference dataTableColRefForCoveredColumns = 6;
+  repeated ColumnReference indexTableColRefForCoveredColumns = 7;
+  required bool isLocalIndex = 8;
+  required bytes indexTableName = 9;
+  required bool rowKeyOrderOptimizable = 10;
+  required bytes dataTableEmptyKeyValueColFamily = 11;
+  required ImmutableBytesWritable emptyKeyValueColFamily = 12;
+  optional bytes indexedExpressions = 13;
+  required bytes rowKeyMetadata = 14;
+  required int32 numDataTableColFamilies = 15;
+  required bool indexWalDisabled = 16;
+  required int32 indexRowKeyByteSize = 17;
+  required bool immutable = 18;
+  repeated ColumnInfo indexedColumnInfo = 19;
+  required int32 encodingScheme = 20;
+  required int32 immutableStorageScheme = 21;
+}
+
 message AddServerCacheRequest {
   optional bytes tenantId = 1;
   required bytes cacheId  = 2;
   required ImmutableBytesWritable cachePtr = 3;
   required ServerCacheFactory cacheFactory = 4;
   optional bytes txState = 5;
+  optional bool hasProtoBufIndexMaintainer = 6;
 }
 
 message AddServerCacheResponse {

Reply via email to