http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 13963d7..5e64209 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -170,8 +171,8 @@ public class WhereCompiler {
             TableRef tableRef = ref.getTableRef();
             if (tableRef.equals(context.getCurrentTable()) && 
!SchemaUtil.isPKColumn(ref.getColumn())) {
                 // track the where condition columns. Later we need to ensure 
the Scan in HRS scans these column CFs
-                
context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), 
ref.getColumn().getName()
-                        .getBytes());
+                byte[] cq = 
EncodedColumnsUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable());
+                
context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
             }
             return ref.newColumnExpression(node.isTableNameCaseSensitive(), 
node.isCaseSensitive());
         }
@@ -194,7 +195,7 @@ public class WhereCompiler {
             // just use that.
             try {
                 if (!SchemaUtil.isPKColumn(ref.getColumn())) {
-                    table.getColumn(ref.getColumn().getName().getString());
+                    
table.getPColumnForColumnName(ref.getColumn().getName().getString());
                 }
             } catch (AmbiguousColumnException e) {
                 disambiguateWithFamily = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index be4766f..5922b5d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
@@ -51,18 +50,19 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.tephra.Transaction;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 
 import com.google.common.collect.ImmutableList;
 
-import org.apache.tephra.Transaction;
-
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     
@@ -79,6 +79,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public static final String DELETE_CQ = "_DeleteCQ";
     public static final String DELETE_CF = "_DeleteCF";
     public static final String EMPTY_CF = "_EmptyCF";
+    public static final String EMPTY_COLUMN_QUALIFIER = 
"_EmptyColumnQualifier";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
@@ -102,6 +103,8 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public final static String SCAN_OFFSET = "_RowOffset";
     public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
     public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
+    public final static String MIN_QUALIFIER = "_MinQualifier";
+    public final static String MAX_QUALIFIER = "_MaxQualifier";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations 
(later). Custom annotations
@@ -261,14 +264,14 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
      * @param indexMaintainer
      * @param viewConstants
      */
-    protected RegionScanner getWrappedScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+    RegionScanner getWrappedScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
             final RegionScanner s, final int offset, final Scan scan,
             final ColumnReference[] dataColumns, final TupleProjector 
tupleProjector,
             final HRegion dataRegion, final IndexMaintainer indexMaintainer,
             final byte[][] viewConstants, final TupleProjector projector,
-            final ImmutableBytesWritable ptr) {
+            final ImmutableBytesWritable ptr, final boolean 
useQualiferAsListIndex) {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, 
tupleProjector,
-                dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr);
+                dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr, useQualiferAsListIndex);
     }
     
     /**
@@ -286,7 +289,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
      * @param tx current transaction
      * @param viewConstants
      */
-    protected RegionScanner getWrappedScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+    RegionScanner getWrappedScanner(final 
ObserverContext<RegionCoprocessorEnvironment> c,
             final RegionScanner s, final Set<KeyValueColumnExpression> 
arrayKVRefs,
             final Expression[] arrayFuncRefs, final int offset, final Scan 
scan,
             final ColumnReference[] dataColumns, final TupleProjector 
tupleProjector,
@@ -294,7 +297,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             Transaction tx, 
             final byte[][] viewConstants, final KeyValueSchema kvSchema, 
             final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
-            final ImmutableBytesWritable ptr) {
+            final ImmutableBytesWritable ptr, final boolean 
useQualifierAsListIndex) {
         return new RegionScanner() {
 
             private boolean hasReferences = checkForReferenceFiles();
@@ -391,11 +394,14 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
                             tupleProjector, dataRegion, indexMaintainer, 
viewConstants, ptr);
                     }
                     if (projector != null) {
-                        Tuple tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
+                        // TODO: samarth think if this is the right thing to 
do here.
+                        Tuple toProject = useQualifierAsListIndex ? new 
PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
+                        Tuple tuple = projector.projectResults(toProject);
                         result.clear();
                         result.add(tuple.getValue(0));
-                        if(arrayElementCell != null)
+                        if (arrayElementCell != null) {
                             result.add(arrayElementCell);
+                        }
                     }
                     // There is a scanattribute set to retrieve the specific 
array element
                     return next;
@@ -429,7 +435,8 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
                             tupleProjector, dataRegion, indexMaintainer, 
viewConstants, ptr);
                     }
                     if (projector != null) {
-                        Tuple tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
+                        Tuple toProject = useQualifierAsListIndex ? new 
PositionBasedMultiKeyValueTuple(result) : new 
ResultTuple(Result.create(result));
+                        Tuple tuple = projector.projectResults(toProject);
                         result.clear();
                         result.add(tuple.getValue(0));
                         if(arrayElementCell != null)
@@ -482,24 +489,10 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
                 // Using KeyValueSchema to set and retrieve the value
                 // collect the first kv to get the row
                 Cell rowKv = result.get(0);
-                for (KeyValueColumnExpression kvExp : arrayKVRefs) {
-                    if (kvExp.evaluate(tuple, ptr)) {
-                        for (int idx = tuple.size() - 1; idx >= 0; idx--) {
-                            Cell kv = tuple.getValue(idx);
-                            if (Bytes.equals(kvExp.getColumnFamily(), 0, 
kvExp.getColumnFamily().length,
-                                    kv.getFamilyArray(), kv.getFamilyOffset(), 
kv.getFamilyLength())
-                                && Bytes.equals(kvExp.getColumnName(), 0, 
kvExp.getColumnName().length,
-                                        kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength())) {
-                                // remove the kv that has the full array 
values.
-                                result.remove(idx);
-                                break;
-                            }
-                        }
-                    }
-                }
                 byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
                         kvSchemaBitSet, ptr);
                 // Add a dummy kv with the exact value of the array index
+                // TODO: samarth how does this dummy column qualifier play 
with encoded column names
                 result.add(new KeyValue(rowKv.getRowArray(), 
rowKv.getRowOffset(), rowKv.getRowLength(),
                         QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, 
QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
                         QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index f88a931..089c4fe 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner 
{
         delegate.close();
     }
 
+    @Override
     public long getMaxResultSize() {
         return delegate.getMaxResultSize();
     }
 
+    @Override
     public boolean next(List<Cell> arg0, int arg1) throws IOException {
         return delegate.next(arg0, arg1);
     }
 
+    @Override
     public boolean next(List<Cell> arg0) throws IOException {
         return delegate.next(arg0);
     }
 
+    @Override
     public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException {
         return delegate.nextRaw(arg0, arg1);
     }
 
+    @Override
     public boolean nextRaw(List<Cell> arg0) throws IOException {
         return delegate.nextRaw(arg0);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index cfe0e4a..9bb47fd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static 
org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -63,7 +65,10 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.IndexUtil;
@@ -132,6 +137,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
 
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != 
null);
         if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
@@ -140,7 +146,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             innerScanner =
                     getWrappedScanner(c, innerScanner, offset, scan, 
dataColumns, tupleProjector, 
-                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, 
useQualifierAsIndex);
         } 
 
         if (j != null) {
@@ -156,9 +162,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         }
         if (keyOrdered) { // Optimize by taking advantage that the rows are
                           // already in the required group by key order
-            return scanOrdered(c, scan, innerScanner, expressions, 
aggregators, limit);
+            return scanOrdered(c, scan, innerScanner, expressions, 
aggregators, limit, j != null);
         } else { // Otherwse, collect them all up in an in memory map
-            return scanUnordered(c, scan, innerScanner, expressions, 
aggregators, limit);
+            return scanUnordered(c, scan, innerScanner, expressions, 
aggregators, limit, j != null);
         }
     }
 
@@ -364,7 +370,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
      */
     private RegionScanner 
scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
             final RegionScanner scanner, final List<Expression> expressions,
-            final ServerAggregators aggregators, long limit) throws 
IOException {
+            final ServerAggregators aggregators, long limit, boolean isJoin) 
throws IOException {
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation 
over unordered rows with scan " + scan
                     + ", group by " + expressions + ", aggregators " + 
aggregators, ScanUtil.getCustomAnnotations(scan)));
@@ -378,7 +384,8 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             estDistVals = Math.max(MIN_DISTINCT_VALUES, 
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
-
+        Pair<Integer, Integer> minMaxQualifiers = 
getMinMaxQualifiersFromScan(scan);
+        boolean useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), 
isJoin);
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
 
@@ -389,12 +396,10 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         boolean success = false;
         try {
             boolean hasMore;
-
-            MultiKeyValueTuple result = new MultiKeyValueTuple();
+            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
             if (logger.isDebugEnabled()) {
                 logger.debug(LogUtil.addCustomAnnotations("Spillable groupby 
enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
             }
-
             HRegion region = c.getEnvironment().getRegion();
             boolean acquiredLock = false;
             try {
@@ -402,7 +407,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                 acquiredLock = true;
                 synchronized (scanner) {
                     do {
-                        List<Cell> results = new ArrayList<Cell>();
+                        List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
                         // Results are potentially returned even when the 
return
                         // value of s.next is false
                         // since this is an indication of whether or not there 
are
@@ -437,7 +442,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             }
         }
     }
-
+    
     /**
      * Used for an aggregate query in which the key order match the group by 
key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group 
by key changes.
@@ -446,12 +451,14 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
      */
     private RegionScanner scanOrdered(final 
ObserverContext<RegionCoprocessorEnvironment> c,
             final Scan scan, final RegionScanner scanner, final 
List<Expression> expressions,
-            final ServerAggregators aggregators, final long limit) throws 
IOException {
+            final ServerAggregators aggregators, final long limit, final 
boolean isJoin) throws IOException {
 
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation 
over ordered rows with scan " + scan + ", group by "
                     + expressions + ", aggregators " + aggregators, 
ScanUtil.getCustomAnnotations(scan)));
         }
+        final Pair<Integer, Integer> minMaxQualifiers = 
getMinMaxQualifiersFromScan(scan);
+        final boolean useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin);
         return new BaseRegionScanner(scanner) {
             private long rowCount = 0;
             private ImmutableBytesWritable currentKey = null;
@@ -461,7 +468,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                 boolean hasMore;
                 boolean atLimit;
                 boolean aggBoundary = false;
-                MultiKeyValueTuple result = new MultiKeyValueTuple();
+                Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 ImmutableBytesWritable key = null;
                 Aggregator[] rowAggregators = aggregators.getAggregators();
                 // If we're calculating no aggregate functions, we can exit at 
the
@@ -474,7 +481,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                     acquiredLock = true;
                     synchronized (scanner) {
                         do {
-                            List<Cell> kvs = new ArrayList<Cell>();
+                            List<Cell> kvs = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
                             // Results are potentially returned even when the 
return
                             // value of s.next is false
                             // since this is an indication of whether or not 
there
@@ -512,6 +519,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                             KeyValueUtil.newKeyValue(currentKey.get(), 
currentKey.getOffset(),
                                 currentKey.getLength(), SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN,
                                 AGG_TIMESTAMP, value, 0, value.length);
+                    //TODO: samarth aaha how do we handle this. It looks like 
we are adding stuff like this to the results
+                    // that we are returning. Bounded skip null cell list 
won't handle this properly. Interesting. So how do we
+                    // handle this. Does having a reserved set of column 
qualifiers help here? 
                     results.add(keyValue);
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Adding new 
aggregate row: "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 2650225..8c2c3d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -110,7 +110,7 @@ public class HashJoinRegionScanner implements RegionScanner 
{
     private void processResults(List<Cell> result, boolean hasBatchLimit) 
throws IOException {
         if (result.isEmpty())
             return;
-        
+        //TODO: samarth make joins work with position based lookup.
         Tuple tuple = new ResultTuple(Result.create(result));
         // For backward compatibility. In new versions, 
HashJoinInfo.forceProjection()
         // always returns true.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5d20fbb..4f41bf6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -27,6 +27,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -34,6 +35,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
@@ -57,6 +59,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -188,8 +191,10 @@ import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
@@ -204,10 +209,12 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -278,6 +285,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
     private static final KeyValue AUTO_PARTITION_SEQ_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
AUTO_PARTITION_SEQ_BYTES);
     private static final KeyValue APPEND_ONLY_SCHEMA_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
APPEND_ONLY_SCHEMA_BYTES);
+    private static final KeyValue STORAGE_SCHEME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
STORAGE_SCHEME_BYTES);
+    private static final KeyValue QUALIIFIER_COUNTER_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_QUALIFIER_COUNTER_BYTES);
     
     private static final List<KeyValue> TABLE_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -304,7 +313,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
-            APPEND_ONLY_SCHEMA_KV
+            APPEND_ONLY_SCHEMA_KV,
+            STORAGE_SCHEME_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -334,6 +344,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int IS_NAMESPACE_MAPPED_INDEX = 
TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
     private static final int AUTO_PARTITION_SEQ_INDEX = 
TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
     private static final int APPEND_ONLY_SCHEMA_INDEX = 
TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
+    private static final int STORAGE_SCHEME_INDEX = 
TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
@@ -347,6 +358,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final KeyValue IS_VIEW_REFERENCED_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_VIEW_REFERENCED_BYTES);
     private static final KeyValue COLUMN_DEF_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_DEF_BYTES);
     private static final KeyValue IS_ROW_TIMESTAMP_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_ROW_TIMESTAMP_BYTES);
+    private static final KeyValue ENCODED_COLUMN_QUALIFIER_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ENCODED_COLUMN_QUALIFIER_BYTES);
     private static final List<KeyValue> COLUMN_KV_COLUMNS = 
Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -359,7 +371,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             VIEW_CONSTANT_KV,
             IS_VIEW_REFERENCED_KV,
             COLUMN_DEF_KV,
-            IS_ROW_TIMESTAMP_KV
+            IS_ROW_TIMESTAMP_KV,
+            ENCODED_COLUMN_QUALIFIER_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -375,9 +388,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int IS_VIEW_REFERENCED_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
     private static final int COLUMN_DEF_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+    private static final int ENCODED_COLUMN_QUALIFIER_INDEX = 
COLUMN_KV_COLUMNS.indexOf(ENCODED_COLUMN_QUALIFIER_KV);
     
     private static final int LINK_TYPE_INDEX = 0;
-
+    
     private static final KeyValue CLASS_NAME_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
CLASS_NAME_BYTES);
     private static final KeyValue JAR_PATH_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
JAR_PATH_BYTES);
     private static final KeyValue RETURN_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
RETURN_TYPE_BYTES);
@@ -713,8 +727,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 isRowTimestampKV == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                         isRowTimestampKV.getValueArray(), 
isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
+        Cell columnQualifierKV = colKeyValues[ENCODED_COLUMN_QUALIFIER_INDEX];
+        Integer columnQualifier = columnQualifierKV == null ? null :
+            
PInteger.INSTANCE.getCodec().decodeInt(columnQualifierKV.getValueArray(), 
columnQualifierKV.getValueOffset(), SortOrder.getDefault());
 
-        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr, isRowTimestamp, false);
+        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier);
         columns.add(column);
     }
     
@@ -922,11 +939,15 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
                 : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
                     isAppendOnlySchemaKv.getValueOffset(), 
isAppendOnlySchemaKv.getValueLength()));
-        
-        
+        Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+        //TODO: change this once we start having other values for storage 
schemes
+        StorageScheme storageScheme = storageSchemeKv == null ? 
StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme
+                
.fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+                        storageSchemeKv.getValueOffset(), 
storageSchemeKv.getValueLength()));
         List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
         List<PName> physicalTables = new ArrayList<PName>();
+        int counter = 0;
         while (true) {
           results.clear();
           scanner.next(results);
@@ -934,29 +955,41 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
               break;
           }
           Cell colKv = results.get(LINK_TYPE_INDEX);
-          int colKeyLength = colKv.getRowLength();
-          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
offset, colKeyLength-offset);
-          int colKeyOffset = offset + colName.getBytes().length + 1;
-          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
colKeyOffset, colKeyLength-colKeyOffset);
-          if (colName.getString().isEmpty() && famName != null) {
-              LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
-              if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes);
-              } else if (linkType == LinkType.PHYSICAL_TABLE) {
-                  physicalTables.add(famName);
+          if (colKv != null) {
+              int colKeyLength = colKv.getRowLength();
+              PName colName = newPName(colKv.getRowArray(), 
colKv.getRowOffset() + offset, colKeyLength-offset);
+              int colKeyOffset = offset + colName.getBytes().length + 1;
+              PName famName = newPName(colKv.getRowArray(), 
colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+                if (colName.getString().isEmpty() && famName != null) {
+                    if (isQualifierCounterKv(colKv)) {
+                        counter = 
PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(),
+                                colKv.getValueOffset(), SortOrder.ASC);
+                    } else {
+                        LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+                        if (linkType == LinkType.INDEX_TABLE) {
+                            addIndexToTable(tenantId, schemaName, famName, 
tableName, clientTimeStamp, indexes);
+                        } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                            physicalTables.add(famName);
+                        }
+                    }
+                } else {
+                  addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
               }
-          } else {
-              addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
-          }
+          } 
         }
         // Avoid querying the stats table because we're holding the rowLock 
here. Issuing an RPC to a remote
         // server while holding this lock is a bad idea and likely to cause 
contention.
+        EncodedCQCounter cqCounter = (storageScheme == 
StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? 
PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter(counter);
+        PName physicalTableName = physicalTables.isEmpty() ? 
PNameFactory.newName(SchemaUtil.getPhysicalTableName(
+                Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), 
tableName.getBytes())), isNamespaceMapped)
+                .getNameAsString()) : physicalTables.get(0);
+        
         return PTableImpl.makePTable(tenantId, schemaName, tableName, 
tableType, indexState, timeStamp, tableSeqNum,
                 pkName, saltBucketNum, columns, tableType == INDEX ? 
schemaName : null,
                 tableType == INDEX ? dataTableName : null, indexes, 
isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId, indexType,
                 rowKeyOrderOptimizable, transactional, updateCacheFrequency, 
baseColumnCount,
-                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema);
+                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, storageScheme, cqCounter);
     }
 
     private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) 
throws IOException, SQLException {
@@ -978,6 +1011,12 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         return new PSchema(schemaName.getString(), timeStamp);
     }
 
+    private static boolean isQualifierCounterKv(Cell colKv) {
+        return Bytes.compareTo(colKv.getQualifierArray(), 
colKv.getQualifierOffset(),
+                colKv.getQualifierLength(), 
QUALIIFIER_COUNTER_KV.getQualifierArray(),
+                QUALIIFIER_COUNTER_KV.getQualifierOffset(), 
QUALIIFIER_COUNTER_KV.getQualifierLength()) == 0;
+    }
+
     private PFunction getFunction(RegionScanner scanner, final boolean 
isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
             throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
@@ -1928,7 +1967,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     return result;
                 }
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
-                // Invalidate from cache
+                // Invalidate from cache. //TODO: samarth should we invalidate 
the base table from the cache too here.
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
                 }
@@ -2101,6 +2140,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 byte[][] rkmd = new byte[5][];
                 int pkCount = getVarChars(m.getRow(), rkmd);
                 if (pkCount > COLUMN_NAME_INDEX
+                        && rkmd[COLUMN_NAME_INDEX] != null && 
rkmd[COLUMN_NAME_INDEX].length > 0
                         && Bytes.compareTo(schemaName, 
rkmd[SCHEMA_NAME_INDEX]) == 0
                         && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) 
== 0) {
                     columnPutsForBaseTable.add(new 
PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, 
ORDINAL_POSITION_BYTES)));
@@ -2135,8 +2175,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                 String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : 
Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
                 try {
-                    existingViewColumn = columnFamily == null ? 
view.getColumn(columnName) : view.getColumnFamily(
-                            columnFamily).getColumn(columnName);
+                    existingViewColumn = columnFamily == null ? 
view.getPColumnForColumnName(columnName) : view.getColumnFamily(
+                            columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not 
present for the column to be added.
                 } catch (ColumnNotFoundException e) {
@@ -2263,26 +2303,26 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     columnsAddedToBaseTable++;
                 }
             }
-            /*
-             * Allow adding a pk columns to base table : 1. if all the view pk 
columns are exactly the same as the base
-             * table pk columns 2. if we are adding all the existing view pk 
columns to the base table
-             */ 
-            if (addingExistingPkCol && 
!viewPkCols.equals(basePhysicalTable.getPKColumns())) {
-                return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
-            }
-            addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, 
invalidateList, clientTimeStamp, view,
-                    deltaNumPkColsSoFar);
-            
-            /*
-             * Increment the sequence number by 1 if:
-             * 1) For a diverged view, there were columns (pk columns) added 
to the view.
-             * 2) For a non-diverged view if the base column count changed.
-             */
-            boolean changeSequenceNumber = (isDivergedView(view) && 
columnsAddedToView > 0)
-                    || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
-            updateViewHeaderRow(basePhysicalTable, tableMetadata, 
mutationsForAddingColumnsToViews,
-                invalidateList, clientTimeStamp, columnsAddedToView, 
columnsAddedToBaseTable,
-                viewKey, view, ordinalPositionList, numCols, 
changeSequenceNumber);
+           /*
+            * Allow adding a pk columns to base table : 1. if all the view pk 
columns are exactly the same as the base
+            * table pk columns 2. if we are adding all the existing view pk 
columns to the base table
+            */ 
+           if (addingExistingPkCol && 
!viewPkCols.equals(basePhysicalTable.getPKColumns())) {
+               return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+           }
+           addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, 
invalidateList, clientTimeStamp, view,
+                   deltaNumPkColsSoFar);
+           
+           /*
+            * Increment the sequence number by 1 if:
+            * 1) For a diverged view, there were columns (pk columns) added to 
the view.
+            * 2) For a non-diverged view if the base column count changed.
+            */
+           boolean changeSequenceNumber = (isDivergedView(view) && 
columnsAddedToView > 0)
+                   || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
+           updateViewHeaderRow(basePhysicalTable, tableMetadata, 
mutationsForAddingColumnsToViews,
+               invalidateList, clientTimeStamp, columnsAddedToView, 
columnsAddedToBaseTable,
+               viewKey, view, ordinalPositionList, numCols, 
changeSequenceNumber);
         }
         return null;
     }
@@ -2440,8 +2480,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 byte[] columnKey = getColumnKey(viewKey, columnName, 
columnFamily);
                 try {
                     existingViewColumn =
-                            columnFamily == null ? view.getColumn(columnName) 
: view
-                                    
.getColumnFamily(columnFamily).getColumn(columnName);
+                            columnFamily == null ? 
view.getPColumnForColumnName(columnName) : view
+                                    
.getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not 
present for the column to
                     // be added.
@@ -2507,7 +2547,9 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     
     private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn 
existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean 
isColumnToBeAddPkCol, PTable view) {
         if (existingViewColumn != null) {
-            
+            if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) 
&& !SchemaUtil.isPKColumn(existingViewColumn)) {
+                return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+            }
             // Validate data type is same
             int baseColumnDataType = getInteger(columnToBeAdded, 
TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
             if (baseColumnDataType != 
existingViewColumn.getDataType().getSqlType()) {
@@ -2750,7 +2792,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                         && 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                     PColumnFamily family =
                                             
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                    
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                 } else if (pkCount > COLUMN_NAME_INDEX
                                         && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                     addingPKColumn = true;
@@ -3003,7 +3045,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                                         PColumnFamily family =
                                                 
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                         columnToDelete =
-                                                
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                                
family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                     } else if (pkCount > COLUMN_NAME_INDEX
                                             && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                         deletePKColumn = true;
@@ -3092,10 +3134,10 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             byte[] indexKey =
                     SchemaUtil.getTableKey(tenantId, 
index.getSchemaName().getBytes(), index
                             .getTableName().getBytes());
+            byte[] cq = EncodedColumnsUtil.getColumnQualifier(columnToDelete, 
index);
+            ColumnReference colRef = new 
ColumnReference(columnToDelete.getFamilyName().getBytes(), cq);
             // If index requires this column for its pk, then drop it
-            if (indexMaintainer.getIndexedColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), 
columnToDelete
-                        .getName().getBytes()))) {
+            if (indexMaintainer.getIndexedColumns().contains(colRef)) {
                 // Since we're dropping the index, lock it to ensure
                 // that a change in index state doesn't
                 // occur while we're dropping it.
@@ -3116,9 +3158,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
             // If the dropped column is a covered index column, invalidate the 
index
-            else if (indexMaintainer.getCoveredColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), 
columnToDelete
-                        .getName().getBytes()))) {
+            else if (indexMaintainer.getCoveredColumns().contains(colRef)){
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 457555e..accaaa9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -36,7 +36,6 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -64,12 +63,11 @@ import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.tephra.Transaction;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import org.apache.tephra.Transaction;
-
 
 /**
  *
@@ -108,7 +106,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         }
     }
 
-    public static OrderedResultIterator deserializeFromScan(Scan scan, 
RegionScanner s) {
+    private static OrderedResultIterator deserializeFromScan(Scan scan, 
RegionScanner s, boolean isJoin) {
         byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
         if (topN == null) {
             return null;
@@ -126,7 +124,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
                 orderByExpression.readFields(input);
                 orderByExpressions.add(orderByExpression);
             }
-            ResultIterator inner = new RegionScannerResultIterator(s);
+            ResultIterator inner = new RegionScannerResultIterator(s, 
ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin);
             return new OrderedResultIterator(inner, orderByExpressions, 
thresholdBytes, limit >= 0 ? limit : null, null,
                     estimatedRowSize);
         } catch (IOException e) {
@@ -219,10 +217,12 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        //TODO: samarth get rid of this join shit. Joins should support 
position based look up.
+        boolean useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != 
null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, 
offset, scan,
                     dataColumns, tupleProjector, dataRegion, indexMaintainer, 
tx,
-                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr);
+                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr, useQualifierAsIndex);
 
         final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {
@@ -230,10 +230,10 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         }
         if (scanOffset != null) {
             innerScanner = getOffsetScanner(c, innerScanner,
-                    new OffsetResultIterator(new 
RegionScannerResultIterator(innerScanner), scanOffset),
+                    new OffsetResultIterator(new 
RegionScannerResultIterator(innerScanner, 
ScanUtil.getMinMaxQualifiersFromScan(scan), j != null), scanOffset),
                     scan.getAttribute(QueryConstants.LAST_SCAN) != null);
         }
-        final OrderedResultIterator iterator = 
deserializeFromScan(scan,innerScanner);
+        final OrderedResultIterator iterator = deserializeFromScan(scan, 
innerScanner, j != null);
         if (iterator == null) {
             return innerScanner;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 2e2d580..89ccff0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -80,6 +80,7 @@ public class SequenceRegionObserver extends 
BaseRegionObserver {
     public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
     private static final byte[] SUCCESS_VALUE = 
PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
     
+    //TODO: samarth verify that it is ok to send non-encoded empty column 
here. Probably is.
     private static Result getErrorResult(byte[] row, long timestamp, int 
errorCode) {
         byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()];
         PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a312020..c1ef0b3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static 
org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
 import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -49,10 +50,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -98,7 +96,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
@@ -114,6 +115,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TimeKeeper;
+import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,8 +123,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import org.apache.tephra.TxConstants;
-
 
 /**
  * Region observer that aggregates ungrouped rows(i.e. SQL query with 
aggregation function and no GROUP BY).
@@ -300,6 +300,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         byte[] deleteCQ = null;
         byte[] deleteCF = null;
         byte[] emptyCF = null;
+        byte[] emptyKVQualifier = null;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
@@ -315,12 +316,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 deleteCQ = 
scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
             }
             emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+            emptyKVQualifier = 
scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER);//TODO: 
samarth check this
         }
         TupleProjector tupleProjector = null;
         byte[][] viewConstants = null;
         ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = 
ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != 
null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
         if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j 
== null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
@@ -329,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
                     getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
-                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, 
useQualifierAsIndex);
         } 
         
         if (j != null)  {
@@ -369,7 +372,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         Aggregator[] rowAggregators = aggregators.getAggregators();
         boolean hasMore;
         boolean hasAny = false;
-        MultiKeyValueTuple result = new MultiKeyValueTuple();
+        Pair<Integer, Integer> minMaxQualifiers = 
getMinMaxQualifiersFromScan(scan);
+        Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
         }
@@ -386,7 +390,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             acquiredLock = true;
             synchronized (innerScanner) {
                 do {
-                    List<Cell> results = new ArrayList<Cell>();
+                    List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
                     // Results are potentially returned even when the return 
value of s.next is false
                     // since this is an indication of whether or not there are 
more values after the
                     // ones returned
@@ -589,8 +593,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     if (!timeStamps.contains(kvts)) {
                                         Put put = new Put(kv.getRowArray(), 
kv.getRowOffset(),
                                             kv.getRowLength());
-                                        put.add(emptyCF, 
QueryConstants.EMPTY_COLUMN_BYTES, kvts,
-                                            ByteUtil.EMPTY_BYTE_ARRAY);
+                                        // FIXME: Use the right byte array 
value. Transactional tables can't
+                                        // have empty byte arrays since Tephra 
seems them as delete markers.
+                                        put.add(emptyCF, emptyKVQualifier != 
null ? emptyKVQualifier
+                                                : 
QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
                                         mutations.add(put);
                                     }
                                 }

Reply via email to