http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
index 146028e..3353f9e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
@@ -17,677 +17,8 @@
  */
 package org.apache.phoenix.hbase.index.balancer;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * <p>This class is an extension of the load balancer class. 
- * It allows to co-locate the regions of the user table and the regions of 
corresponding
- * index table if any.</p> 
- * 
- * </>roundRobinAssignment, retainAssignment -> index regions will follow the 
actual table regions. 
- * randomAssignment, balancerCluster -> either index table or actual table 
region(s) will follow
- * each other based on which ever comes first.</p> 
- * 
- * <p>In case of master failover there is a chance that the znodes of the index
- * table and actual table are left behind. Then in that scenario we may get 
randomAssignment for
- * either the actual table region first or the index table region first.</p>
- * 
- * <p>In case of balancing by table any table can balance first.</p>
- * 
- */
-
-public class IndexLoadBalancer implements LoadBalancer {
-
-    private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class);
-
-    public static final byte[] PARENT_TABLE_KEY = 
Bytes.toBytes("PARENT_TABLE");
-
-    public static final String INDEX_BALANCER_DELEGATOR = 
"hbase.index.balancer.delegator.class";
-
-    private LoadBalancer delegator;
-
-    private MasterServices master;
-
-    private Configuration conf;
-
-    private ClusterStatus clusterStatus;
-
-    private static final Random RANDOM = new 
Random(EnvironmentEdgeManager.currentTimeMillis());
-
-    Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, 
TableName>();
-
-    Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, 
TableName>();
-
-    /**
-     * Maintains colocation information of user regions and corresponding 
index regions.
-     */
-    private Map<TableName, Map<ImmutableBytesWritable, ServerName>> 
colocationInfo =
-            new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, 
ServerName>>();
-
-    private Set<TableName> balancedTables = new HashSet<TableName>();
-
-    private boolean stopped = false;
-
-    @Override
-    public void initialize() throws HBaseIOException {
-        Class<? extends LoadBalancer> delegatorKlass =
-                conf.getClass(INDEX_BALANCER_DELEGATOR, 
StochasticLoadBalancer.class,
-                    LoadBalancer.class);
-        this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf);
-        this.delegator.setClusterStatus(clusterStatus);
-        this.delegator.setMasterServices(this.master);
-        this.delegator.initialize();
-        try {
-            
populateTablesToColocate(this.master.getTableDescriptors().getAll());
-        } catch (IOException e) {
-            throw new HBaseIOException(e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration configuration) {
-        this.conf = configuration;
-    }
-
-    @Override
-    public void onConfigurationChange(Configuration conf) {
-        setConf(conf);
-    }
-
-    @Override
-    public void setClusterStatus(ClusterStatus st) {
-        this.clusterStatus = st;
-    }
-
-    public Map<TableName, Map<ImmutableBytesWritable, ServerName>> 
getColocationInfo() {
-        return colocationInfo;
-    }
-
-    @Override
-    public void setMasterServices(MasterServices masterServices) {
-        this.master = masterServices;
-    }
-
-    @Override
-    public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> 
clusterState)
-            throws HBaseIOException {
-        synchronized (this.colocationInfo) {
-            boolean balanceByTable = 
conf.getBoolean("hbase.master.loadbalance.bytable", false);
-            List<RegionPlan> regionPlans = null;
-
-            TableName tableName = null;
-            if (balanceByTable) {
-                Map<ImmutableBytesWritable, ServerName> tableKeys = null;
-                for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : 
clusterState
-                        .entrySet()) {
-                    ServerName sn = serverVsRegionList.getKey();
-                    List<HRegionInfo> regionInfos = 
serverVsRegionList.getValue();
-                    if (regionInfos.isEmpty()) {
-                        continue;
-                    }
-                    if (!isTableColocated(regionInfos.get(0).getTable())) {
-                        return this.delegator.balanceCluster(clusterState);
-                    }
-                    // Just get the table name from any one of the values in 
the regioninfo list
-                    if (tableName == null) {
-                        tableName = regionInfos.get(0).getTable();
-                        tableKeys = this.colocationInfo.get(tableName);
-                    }
-                    // Check and modify the colocation info map based on 
values of cluster state
-                    // because we
-                    // will
-                    // call balancer only when the cluster is in stable and 
reliable state.
-                    if (tableKeys != null) {
-                        for (HRegionInfo hri : regionInfos) {
-                            updateServer(tableKeys, sn, hri);
-                        }
-                    }
-                }
-                // If user table is already balanced find the index table 
plans from the user table
-                // plans
-                // or vice verca.
-                TableName mappedTableName = 
getMappedTableToColocate(tableName);
-                if (balancedTables.contains(mappedTableName)) {
-                    balancedTables.remove(mappedTableName);
-                    regionPlans = new ArrayList<RegionPlan>();
-                    return prepareRegionPlansForClusterState(clusterState, 
regionPlans);
-                } else {
-                    balancedTables.add(tableName);
-                    regionPlans = this.delegator.balanceCluster(clusterState);
-                    if (regionPlans == null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(tableName + " regions already 
balanced.");
-                        }
-                        return null;
-                    } else {
-                        updateRegionPlans(regionPlans);
-                        return regionPlans;
-                    }
-                }
-
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Seperating user tables and index tables regions 
of "
-                            + "each region server in the cluster.");
-                }
-                Map<ServerName, List<HRegionInfo>> userClusterState =
-                        new HashMap<ServerName, List<HRegionInfo>>();
-                Map<ServerName, List<HRegionInfo>> indexClusterState =
-                        new HashMap<ServerName, List<HRegionInfo>>();
-                for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : 
clusterState
-                        .entrySet()) {
-                    ServerName sn = serverVsRegionList.getKey();
-                    List<HRegionInfo> regionsInfos = 
serverVsRegionList.getValue();
-                    List<HRegionInfo> idxRegionsToBeMoved = new 
ArrayList<HRegionInfo>();
-                    List<HRegionInfo> userRegionsToBeMoved = new 
ArrayList<HRegionInfo>();
-                    for (HRegionInfo hri : regionsInfos) {
-                        if (hri.isMetaRegion()) {
-                            continue;
-                        }
-                        tableName = hri.getTable();
-                        // Check and modify the colocation info map based on 
values of cluster state
-                        // because we
-                        // will
-                        // call balancer only when the cluster is in stable 
and reliable state.
-                        if (isTableColocated(tableName)) {
-                            // table name may change every time thats why 
always need to get table
-                            // entries.
-                            Map<ImmutableBytesWritable, ServerName> tableKeys =
-                                    this.colocationInfo.get(tableName);
-                            if (tableKeys != null) {
-                                updateServer(tableKeys, sn, hri);
-                            }
-                        }
-                        if (indexTableVsUserTable.containsKey(tableName)) {
-                            idxRegionsToBeMoved.add(hri);
-                            continue;
-                        }
-                        userRegionsToBeMoved.add(hri);
-                    }
-                    // there may be dummy entries here if assignments by table 
is set
-                    userClusterState.put(sn, userRegionsToBeMoved);
-                    indexClusterState.put(sn, idxRegionsToBeMoved);
-                }
-
-                regionPlans = this.delegator.balanceCluster(userClusterState);
-                if (regionPlans == null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("User region plan is null.");
-                    }
-                    regionPlans = new ArrayList<RegionPlan>();
-                } else {
-                    updateRegionPlans(regionPlans);
-                }
-                return prepareRegionPlansForClusterState(indexClusterState, 
regionPlans);
-            }
-        }
-    }
-
-    private void updateServer(Map<ImmutableBytesWritable, ServerName> 
tableKeys, ServerName sn,
-            HRegionInfo hri) {
-        ImmutableBytesWritable startKey = new 
ImmutableBytesWritable(hri.getStartKey());
-        ServerName existingServer = tableKeys.get(startKey);
-        if (!sn.equals(existingServer)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("There is a mismatch in the existing server name for 
the region " + hri
-                        + ".  Replacing the server " + existingServer + " with 
" + sn + ".");
-            }
-            tableKeys.put(startKey, sn);
-        }
-    }
-
-    /**
-     * Prepare region plans for cluster state
-     * @param clusterState if balancing is table wise then cluster state 
contains only indexed or
-     *            index table regions, otherwise it contains all index tables 
regions.
-     * @param regionPlans
-     * @return
-     */
-    private List<RegionPlan> prepareRegionPlansForClusterState(
-            Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> 
regionPlans) {
-        if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>();
-        ImmutableBytesWritable startKey = new ImmutableBytesWritable();
-        for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : 
clusterState.entrySet()) {
-            List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
-            ServerName server = serverVsRegionList.getKey();
-            for (HRegionInfo regionInfo : regionInfos) {
-                if (!isTableColocated(regionInfo.getTable())) continue;
-                TableName mappedTableName = 
getMappedTableToColocate(regionInfo.getTable());
-                startKey.set(regionInfo.getStartKey());
-                ServerName sn = 
this.colocationInfo.get(mappedTableName).get(startKey);
-                if (sn.equals(server)) {
-                    continue;
-                } else {
-                    RegionPlan rp = new RegionPlan(regionInfo, server, sn);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Selected server " + rp.getDestination()
-                                + " as destination for region "
-                                + regionInfo.getRegionNameAsString() + " from 
colocation info.");
-                    }
-                    regionOnline(regionInfo, rp.getDestination());
-                    regionPlans.add(rp);
-                }
-            }
-        }
-        return regionPlans;
-    }
-
-    private void updateRegionPlans(List<RegionPlan> regionPlans) {
-        for (RegionPlan regionPlan : regionPlans) {
-            HRegionInfo hri = regionPlan.getRegionInfo();
-            if (!isTableColocated(hri.getTable())) continue;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Saving region plan of region " + 
hri.getRegionNameAsString() + '.');
-            }
-            regionOnline(hri, regionPlan.getDestination());
-        }
-    }
-
-    @Override
-    public Map<ServerName, List<HRegionInfo>> 
roundRobinAssignment(List<HRegionInfo> regions,
-            List<ServerName> servers) throws HBaseIOException {
-        List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>();
-        List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
-        for (HRegionInfo hri : regions) {
-            seperateUserAndIndexRegion(hri, userRegions, indexRegions);
-        }
-        Map<ServerName, List<HRegionInfo>> bulkPlan = null;
-        if (!userRegions.isEmpty()) {
-            bulkPlan = this.delegator.roundRobinAssignment(userRegions, 
servers);
-            // This should not happen.
-            if (null == bulkPlan) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No region plans selected for user regions in 
roundRobinAssignment.");
-                }
-                return null;
-            }
-            savePlan(bulkPlan);
-        }
-        bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
-        return bulkPlan;
-    }
-
-    private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> 
userRegions,
-            List<HRegionInfo> indexRegions) {
-        if (indexTableVsUserTable.containsKey(hri.getTable())) {
-            indexRegions.add(hri);
-            return;
-        }
-        userRegions.add(hri);
-    }
-
-    private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan(
-            List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> 
bulkPlan,
-            List<ServerName> servers) throws HBaseIOException {
-        if (null != indexRegions && !indexRegions.isEmpty()) {
-            if (null == bulkPlan) {
-                bulkPlan = new ConcurrentHashMap<ServerName, 
List<HRegionInfo>>();
-            }
-            for (HRegionInfo hri : indexRegions) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Preparing region plan for index region "
-                            + hri.getRegionNameAsString() + '.');
-                }
-                ServerName destServer = getDestServerForIdxRegion(hri);
-                List<HRegionInfo> destServerRegions = null;
-                if (destServer == null) destServer = 
this.randomAssignment(hri, servers);
-                if (destServer != null) {
-                    destServerRegions = bulkPlan.get(destServer);
-                    if (null == destServerRegions) {
-                        destServerRegions = new ArrayList<HRegionInfo>();
-                        bulkPlan.put(destServer, destServerRegions);
-                    }
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Server " + destServer + " selected for 
region "
-                                + hri.getRegionNameAsString() + '.');
-                    }
-                    destServerRegions.add(hri);
-                    regionOnline(hri, destServer);
-                }
-            }
-        }
-        return bulkPlan;
-    }
-
-    private ServerName getDestServerForIdxRegion(HRegionInfo hri) {
-        // Every time we calculate the table name because in case of master 
restart the index
-        // regions
-        // may be coming for different index tables.
-        TableName actualTable = getMappedTableToColocate(hri.getTable());
-        ImmutableBytesWritable startkey = new 
ImmutableBytesWritable(hri.getStartKey());
-        synchronized (this.colocationInfo) {
-
-            Map<ImmutableBytesWritable, ServerName> tableKeys = 
colocationInfo.get(actualTable);
-            if (null == tableKeys) {
-                // Can this case come
-                return null;
-            }
-            if (tableKeys.containsKey(startkey)) {
-                // put index region location if corresponding user region 
found in regionLocation
-                // map.
-                ServerName sn = tableKeys.get(startkey);
-                regionOnline(hri, sn);
-                return sn;
-            }
-        }
-        return null;
-    }
-
-    private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) {
-        synchronized (this.colocationInfo) {
-            for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) 
{
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Saving user regions' plans for server " + 
e.getKey() + '.');
-                }
-                for (HRegionInfo hri : e.getValue()) {
-                    if (!isTableColocated(hri.getTable())) continue;
-                    regionOnline(hri, e.getKey());
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Saved user regions' plans for server " + 
e.getKey() + '.');
-                }
-            }
-        }
-    }
-
-    @Override
-    public Map<ServerName, List<HRegionInfo>> retainAssignment(
-            Map<HRegionInfo, ServerName> regions, List<ServerName> servers) 
throws HBaseIOException {
-        Map<HRegionInfo, ServerName> userRegionsMap =
-                new ConcurrentHashMap<HRegionInfo, ServerName>();
-        List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
-        for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) {
-            seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, 
servers);
-        }
-        Map<ServerName, List<HRegionInfo>> bulkPlan = null;
-        if (!userRegionsMap.isEmpty()) {
-            bulkPlan = this.delegator.retainAssignment(userRegionsMap, 
servers);
-            if (bulkPlan == null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Empty region plan for user regions.");
-                }
-                return null;
-            }
-            savePlan(bulkPlan);
-        }
-        bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
-        return bulkPlan;
-    }
-
-    private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e,
-            Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> 
indexRegions,
-            List<ServerName> servers) {
-        HRegionInfo hri = e.getKey();
-        if (indexTableVsUserTable.containsKey(hri.getTable())) {
-            indexRegions.add(hri);
-            return;
-        }
-        if (e.getValue() == null) {
-            userRegionsMap.put(hri, 
servers.get(RANDOM.nextInt(servers.size())));
-        } else {
-            userRegionsMap.put(hri, e.getValue());
-        }
-    }
-
-    @Override
-    public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> 
regions,
-            List<ServerName> servers) throws HBaseIOException {
-        return this.delegator.immediateAssignment(regions, servers);
-    }
-
-    @Override
-    public ServerName randomAssignment(HRegionInfo regionInfo, 
List<ServerName> servers)
-            throws HBaseIOException {
-        if (!isTableColocated(regionInfo.getTable())) {
-            return this.delegator.randomAssignment(regionInfo, servers);
-        }
-        ServerName sn = getServerNameFromMap(regionInfo, servers);
-        if (sn == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No server found for region " + 
regionInfo.getRegionNameAsString() + '.');
-            }
-            sn = getRandomServer(regionInfo, servers);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Destination server for region " + 
regionInfo.getRegionNameAsString()
-                    + " is " + ((sn == null) ? "null" : sn.toString()) + '.');
-        }
-        return sn;
-    }
-
-    private ServerName getRandomServer(HRegionInfo regionInfo, 
List<ServerName> servers)
-            throws HBaseIOException {
-        ServerName sn = null;
-        sn = this.delegator.randomAssignment(regionInfo, servers);
-        if (sn == null) return null;
-        regionOnline(regionInfo, sn);
-        return sn;
-    }
-
-    private ServerName getServerNameFromMap(HRegionInfo regionInfo, 
List<ServerName> onlineServers) {
-        TableName tableName = regionInfo.getTable();
-        TableName mappedTable = 
getMappedTableToColocate(regionInfo.getTable());
-        ImmutableBytesWritable startKey = new 
ImmutableBytesWritable(regionInfo.getStartKey());
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> correspondingTableKeys =
-                    this.colocationInfo.get(mappedTable);
-            Map<ImmutableBytesWritable, ServerName> actualTableKeys =
-                    this.colocationInfo.get(tableName);
-
-            if (null != correspondingTableKeys) {
-                if (correspondingTableKeys.containsKey(startKey)) {
-                    ServerName previousServer = null;
-                    if (null != actualTableKeys) {
-                        previousServer = actualTableKeys.get(startKey);
-                    }
-                    ServerName sn = correspondingTableKeys.get(startKey);
-                    if (null != previousServer) {
-                        // if servername of index region and user region are 
same in colocationInfo
-                        // clean
-                        // previous plans and return null
-                        if (previousServer.equals(sn)) {
-                            correspondingTableKeys.remove(startKey);
-                            actualTableKeys.remove(startKey);
-                            if (LOG.isDebugEnabled()) {
-                                LOG
-                                        .debug("Both user region plan and 
corresponding index region plan "
-                                                + "in colocation info are 
same. Hence clearing the plans to select new plan"
-                                                + " for the region "
-                                                + 
regionInfo.getRegionNameAsString() + ".");
-                            }
-                            return null;
-                        }
-                    }
-                    if (sn != null && onlineServers.contains(sn)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Updating the region plan of the region "
-                                    + regionInfo.getRegionNameAsString() + " 
with server " + sn);
-                        }
-                        regionOnline(regionInfo, sn);
-                        return sn;
-                    } else if (sn != null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("The location " + sn + " of region with 
start key"
-                                    + 
Bytes.toStringBinary(regionInfo.getStartKey())
-                                    + " is not in online. Selecting other 
region server.");
-                        }
-                        return null;
-                    }
-                }
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No region plans in colocationInfo for table " + 
mappedTable);
-                }
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-        TableName tableName = regionInfo.getTable();
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> tabkeKeys = 
this.colocationInfo.get(tableName);
-            if (tabkeKeys == null) {
-                tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, 
ServerName>();
-                this.colocationInfo.put(tableName, tabkeKeys);
-            }
-            tabkeKeys.put(new 
ImmutableBytesWritable(regionInfo.getStartKey()), sn);
-        }
-    }
-
-    public void clearTableRegionPlans(TableName tableName) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Clearing regions plans from colocationInfo for table " 
+ tableName);
-        }
-        synchronized (this.colocationInfo) {
-            this.colocationInfo.remove(tableName);
-        }
-    }
-
-    @Override
-    public void regionOffline(HRegionInfo regionInfo) {
-        TableName tableName = regionInfo.getTable();
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> tableKeys = 
this.colocationInfo.get(tableName);
-            if (null == tableKeys) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No regions of table " + tableName + " in the 
colocationInfo.");
-                }
-            } else {
-                tableKeys.remove(new 
ImmutableBytesWritable(regionInfo.getStartKey()));
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("The regioninfo " + regionInfo + " removed from 
the colocationInfo");
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean isStopped() {
-        return stopped;
-    }
-
-    @Override
-    public void stop(String why) {
-        LOG.info("Load Balancer stop requested: " + why);
-        stopped = true;
-    }
-
-    public void populateTablesToColocate(Map<String, HTableDescriptor> 
tableDescriptors) {
-        HTableDescriptor desc = null;
-        for (Entry<String, HTableDescriptor> entry : 
tableDescriptors.entrySet()) {
-            desc = entry.getValue();
-            if (desc.getValue(PARENT_TABLE_KEY) != null) {
-                
addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc
-                        .getTableName());
-            }
-        }
-    }
-
-    /**
-     * Add tables whose regions to co-locate.
-     * @param userTable
-     * @param indexTable
-     */
-    public void addTablesToColocate(TableName userTable, TableName indexTable) 
{
-        if (userTable.equals(indexTable)) {
-            throw new IllegalArgumentException("Tables to colocate should not 
be same.");
-        } else if (isTableColocated(userTable)) {
-            throw new IllegalArgumentException("User table already colocated 
with table "
-                    + getMappedTableToColocate(userTable));
-        } else if (isTableColocated(indexTable)) {
-            throw new IllegalArgumentException("Index table is already 
colocated with table "
-                    + getMappedTableToColocate(indexTable));
-        }
-        userTableVsIndexTable.put(userTable, indexTable);
-        indexTableVsUserTable.put(indexTable, userTable);
-    }
-
-    /**
-     * Removes the specified table and corresponding table from co-location.
-     * @param table
-     */
-    public void removeTablesFromColocation(TableName table) {
-        TableName other = userTableVsIndexTable.remove(table);
-        if (other != null) {
-            indexTableVsUserTable.remove(other);
-        } else {
-            other = indexTableVsUserTable.remove(table);
-            if (other != null) userTableVsIndexTable.remove(other);
-        }
-    }
-
-    /**
-     * Return mapped table to co-locate.
-     * @param tableName
-     * @return index table if the specified table is user table or vice versa.
-     */
-    public TableName getMappedTableToColocate(TableName tableName) {
-        TableName other = userTableVsIndexTable.get(tableName);
-        return other == null ? indexTableVsUserTable.get(tableName) : other;
-    }
 
-    public boolean isTableColocated(TableName table) {
-        return userTableVsIndexTable.containsKey(table) || 
indexTableVsUserTable.containsKey(table);
-    }
+public class IndexLoadBalancer extends StochasticLoadBalancer {
 
-    /**
-     * Populates table's region locations into co-location info from master.
-     * @param table
-     */
-    public void populateRegionLocations(TableName table) {
-        synchronized (this.colocationInfo) {
-            if (!isTableColocated(table)) {
-                throw new IllegalArgumentException("Specified table " + table
-                        + " should be in one of the tables to co-locate.");
-            }
-            RegionStates regionStates = 
this.master.getAssignmentManager().getRegionStates();
-            List<HRegionInfo> onlineRegions = 
regionStates.getRegionsOfTable(table);
-            for (HRegionInfo hri : onlineRegions) {
-                regionOnline(hri, regionStates.getRegionServerOfRegion(hri));
-            }
-            Map<String, RegionState> regionsInTransition = 
regionStates.getRegionsInTransition();
-            for (RegionState regionState : regionsInTransition.values()) {
-                if (table.equals(regionState.getRegion().getTable())
-                        && regionState.getServerName() != null) {
-                    regionOnline(regionState.getRegion(), 
regionState.getServerName());
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
index a014da2..2f83f8d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
@@ -17,98 +17,12 @@
  */
 package org.apache.phoenix.hbase.index.master;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
-import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
-import org.apache.phoenix.util.MetaDataUtil;
 
 /**
  * Defines of coprocessor hooks(to support secondary indexing) of operations on
  * {@link org.apache.hadoop.hbase.master.HMaster} process.
  */
 public class IndexMasterObserver extends BaseMasterObserver {
-    IndexLoadBalancer balancer = null;
-
-    @Override
-    public void 
preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
-            throws IOException {
-        LoadBalancer loadBalancer =
-                
ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer();
-        if (loadBalancer instanceof IndexLoadBalancer) {
-            balancer = (IndexLoadBalancer) loadBalancer;
-        }
-        super.preMasterInitialization(ctx);
-    }
-
-    @Override
-    public void 
preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
-        TableName userTableName = null;
-        if (balancer != null && 
desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
-            userTableName =
-                    
TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
-            balancer.addTablesToColocate(userTableName, desc.getTableName());
-        }
-        if (userTableName != null) 
balancer.populateRegionLocations(userTableName);
-        super.preCreateTableHandler(ctx, desc, regions);
-    }
-
-    @Override
-    public void 
preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            TableName tableName, HTableDescriptor htd) throws IOException {
-        HTableDescriptor oldDesc =
-                
ctx.getEnvironment().getMasterServices().getTableDescriptors().get(tableName);
-        if (oldDesc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) == null
-                && htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
-            TableName userTableName =
-                    
TableName.valueOf(htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
-            balancer.addTablesToColocate(userTableName, htd.getTableName());
-        }
-        super.preModifyTableHandler(ctx, tableName, htd);
-    }
-
-    @Override
-    public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, 
HRegionInfo region,
-            ServerName srcServer, ServerName destServer) throws IOException {
-        if (balancer != null && balancer.isTableColocated(region.getTable())) {
-            AssignmentManager am = 
ctx.getEnvironment().getMasterServices().getAssignmentManager();
-            RegionStates regionStates = am.getRegionStates();
-            String tableName = region.getTable().getNameAsString();
-            String correspondingTable = 
MetaDataUtil.isLocalIndex(region.getTable().getNameAsString())
-                    ? MetaDataUtil.getUserTableName(tableName)
-                    : 
Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(tableName.getBytes()));
-            List<HRegionInfo> regions =
-                    
regionStates.getRegionsOfTable(TableName.valueOf(correspondingTable));
-            for (HRegionInfo hri : regions) {
-                if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) 
== 0
-                        && destServer != null) {
-                    balancer.regionOnline(hri, destServer);
-                    am.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, 
destServer));
-                    am.unassign(hri);
-                }
-            }
-        }
-        super.postMove(ctx, region, srcServer, destServer);
-    }
 
-    @Override
-    public void 
postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            TableName tableName) throws IOException {
-        if (balancer != null && balancer.isTableColocated(tableName)) {
-            balancer.removeTablesFromColocation(tableName);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index d7fef5e..5e3f3ed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable {
 
   void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String 
name);
 
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, 
boolean allowLocalUpdates)
       throws IndexWriteException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 30797b2..cbcec3b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -128,10 +128,11 @@ public class IndexWriter implements Stoppable {
    * @param indexUpdates Updates to write
  * @throws IOException 
    */
-  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> 
indexUpdates) throws IOException  {
+    public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, 
byte[]>> indexUpdates,
+            boolean allowLocalUpdates) throws IOException {
     // convert the strings to htableinterfaces to which we can talk and group 
by TABLE
     Multimap<HTableInterfaceReference, Mutation> toWrite = 
resolveTableReferences(indexUpdates);
-    writeAndKillYourselfOnFailure(toWrite);
+    writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates);
   }
 
   /**
@@ -139,9 +140,10 @@ public class IndexWriter implements Stoppable {
    * @param toWrite
  * @throws IOException 
    */
-  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, 
Mutation> toWrite) throws IOException {
+    public void 
writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> 
toWrite,
+            boolean allowLocalUpdates) throws IOException {
     try {
-      write(toWrite);
+      write(toWrite, allowLocalUpdates);
       if (LOG.isTraceEnabled()) {
         LOG.trace("Done writing all index updates!\n\t" + toWrite);
       }
@@ -165,21 +167,24 @@ public class IndexWriter implements Stoppable {
    * @throws IndexWriteException if we cannot successfully write to the index. 
Whether or not we
    *           stop early depends on the {@link IndexCommitter}.
    */
-  public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws 
IndexWriteException {
-    write(resolveTableReferences(toWrite));
-  }
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws 
IndexWriteException {
+       write(resolveTableReferences(toWrite), false);
+    }
+
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean 
allowLocalUpdates) throws IndexWriteException {
+       write(resolveTableReferences(toWrite), allowLocalUpdates);
+    }
 
   /**
    * see {@link #write(Collection)}
    * @param toWrite
    * @throws IndexWriteException
    */
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws IndexWriteException {
-    this.writer.write(toWrite);
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, 
boolean allowLocalUpdates)
+             throws IndexWriteException {
+         this.writer.write(toWrite, allowLocalUpdates);
   }
 
-
   /**
    * Convert the passed index updates to {@link HTableInterfaceReference}s.
    * @param indexUpdates from the index builder

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 233dc57..0dc11bc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,8 +21,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -98,7 +100,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
     }
 
     @Override
-    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) 
throws SingleIndexWriteFailureException {
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, 
final boolean allowLocalUpdates) throws SingleIndexWriteFailureException {
         /*
          * This bit here is a little odd, so let's explain what's going on. 
Basically, we want to do the writes in
          * parallel to each index table, so each table gets its own task and 
is submitted to the pool. Where it gets
@@ -116,7 +118,12 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
             // doing a complete copy over of all the index update for each 
table.
             final List<Mutation> mutations = 
kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
             final HTableInterfaceReference tableReference = entry.getKey();
-            final RegionCoprocessorEnvironment env = this.env;
+                       if (env != null
+                                       && !allowLocalUpdates
+                                       && tableReference.getTableName().equals(
+                                                       
env.getRegion().getTableDesc().getNameAsString())) {
+                               continue;
+                       }
             /*
              * Write a batch of index updates to an index table. This 
operation stops (is cancelable) via two
              * mechanisms: (1) setting aborted or stopped on the IndexWriter 
or, (2) interrupting the running thread.
@@ -145,29 +152,14 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                         LOG.debug("Writing index update:" + mutations + " to 
table: " + tableReference);
                     }
                     try {
-                        // TODO: Once HBASE-11766 is fixed, reexamine whether 
this is necessary.
-                        // Also, checking the prefix of the table name to 
determine if this is a local
-                        // index is pretty hacky. If we're going to keep this, 
we should revisit that
-                        // as well.
-                        try {
-                            if 
(MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
-                                Region indexRegion = 
IndexUtil.getIndexRegion(env);
-                                if (indexRegion != null) {
-                                    throwFailureIfDone();
-                                    
indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
-                                        HConstants.NO_NONCE, 
HConstants.NO_NONCE);
-                                    return null;
-                                }
-                            }
-                        } catch (IOException ignord) {
-                            // when it's failed we fall back to the standard & 
slow way
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("indexRegion.batchMutate failed and 
fall back to HTable.batch(). Got error="
-                                        + ignord);
-                            }
-                        }
+                                               if (allowLocalUpdates) {
+                                                       for (Mutation m : 
mutations) {
+                                                               
m.setDurability(Durability.SKIP_WAL);
+                                                       }
+                                               }
                         HTableInterface table = 
factory.getTable(tableReference.get());
                         throwFailureIfDone();
+                        int i = 0;
                         table.batch(mutations);
                     } catch (SingleIndexWriteFailureException e) {
                         throw e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 14768ac..fec74ca 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -110,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
     }
 
     @Override
-    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) 
throws MultiIndexWriteFailureException {
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, 
final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
         Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = 
toWrite.asMap().entrySet();
         TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
         List<HTableInterfaceReference> tables = new 
ArrayList<HTableInterfaceReference>(entries.size());
@@ -121,6 +122,12 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
             // track each reference so we can get at it easily later, when 
determing failures
             final HTableInterfaceReference tableReference = entry.getKey();
             final RegionCoprocessorEnvironment env = this.env;
+                       if (env != null
+                                       && !allowLocalUpdates
+                                       && tableReference.getTableName().equals(
+                                                       
env.getRegion().getTableDesc().getNameAsString())) {
+                               continue;
+                       }
             tables.add(tableReference);
 
             /*
@@ -144,33 +151,16 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
                     try {
                         // this may have been queued, but there was an 
abort/stop so we try to early exit
                         throwFailureIfDone();
+                                               if (allowLocalUpdates) {
+                                                       for (Mutation m : 
mutations) {
+                                                               
m.setDurability(Durability.SKIP_WAL);
+                                                       }
+                                               }
 
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Writing index update:" + mutations + " 
to table: " + tableReference);
                         }
 
-                        try {
-                            // TODO: Once HBASE-11766 is fixed, reexamine 
whether this is necessary.
-                            // Also, checking the prefix of the table name to 
determine if this is a local
-                            // index is pretty hacky. If we're going to keep 
this, we should revisit that
-                            // as well.
-                            if 
(MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
-                                Region indexRegion = 
IndexUtil.getIndexRegion(env);
-                                if (indexRegion != null) {
-                                    throwFailureIfDone();
-                                    
indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
-                                        HConstants.NO_NONCE, 
HConstants.NO_NONCE);
-                                    return Boolean.TRUE;
-                                }
-                            }
-                        } catch (IOException ignord) {
-                            // when it's failed we fall back to the standard & 
slow way
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("indexRegion.batchMutate failed and 
fall back to HTable.batch(). Got error="
-                                        + ignord);
-                            }
-                        }
-
                         HTableInterface table = 
factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1725b11..6a316d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -274,6 +274,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     // columns required to evaluate all expressions in indexedExpressions 
(this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     // columns required to create index row i.e. indexedColumns + 
coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -363,6 +364,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         this.indexedColumnTypes = 
Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = 
Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = 
Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : 
nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -430,6 +432,14 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             for (PColumn indexColumn : family.getColumns()) {
                 PColumn column = IndexUtil.getDataColumn(dataTable, 
indexColumn.getName().getString());
                 this.coveredColumns.add(new 
ColumnReference(column.getFamilyName().getBytes(), 
column.getName().getBytes()));
+                if(isLocalIndex) {
+                    this.coveredColumnsMap.put(
+                        new ColumnReference(column.getFamilyName().getBytes(), 
column.getName()
+                                .getBytes()),
+                        new ColumnReference(isLocalIndex ? 
Bytes.toBytes(IndexUtil
+                                
.getLocalIndexColumnFamily(column.getFamilyName().getString()))
+                                : column.getFamilyName().getBytes(), 
column.getName().getBytes()));
+                }
             }
         }
         this.estimatedIndexRowKeyBytes = 
estimateIndexRowKeyByteSize(indexColByteSize);
@@ -861,7 +871,12 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                     put.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
                 //this is a little bit of extra work for installations that 
are running <0.94.14, but that should be rare and is a short-term set of 
wrappers - it shouldn't kill GC
-                put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), 
cq, ts, value));
+                if(this.isLocalIndex) {
+                    ColumnReference columnReference = 
this.coveredColumnsMap.get(ref);
+                                       put.add(kvBuilder.buildPut(rowKey, 
columnReference.getFamilyWritable(), cq, ts, value));
+                } else {
+                    put.add(kvBuilder.buildPut(rowKey, 
ref.getFamilyWritable(), cq, ts, value));
+                }
             }
         }
         return put;
@@ -949,11 +964,17 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             // If table delete was single version, then index delete should be 
as well
             if (deleteType == DeleteType.SINGLE_VERSION) {
                 for (ColumnReference ref : getCoverededColumns()) { // FIXME: 
Keep Set<byte[]> for index CFs?
+                    if(this.isLocalIndex) {
+                                               ref = 
this.coveredColumnsMap.get(ref);
+                    }
                     delete.deleteFamilyVersion(ref.getFamily(), ts);
                 }
                 delete.deleteFamilyVersion(emptyCF, ts);
             } else {
                 for (ColumnReference ref : getCoverededColumns()) { // FIXME: 
Keep Set<byte[]> for index CFs?
+                    if(this.isLocalIndex) {
+                                               ref = 
this.coveredColumnsMap.get(ref);
+                    }
                     delete.deleteFamily(ref.getFamily(), ts);
                 }
                 delete.deleteFamily(emptyCF, ts);
@@ -971,11 +992,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? 
Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
+                    ColumnReference columnReference = ref;
+                    if(this.isLocalIndex) {
+                        columnReference = this.coveredColumnsMap.get(ref);
+                    }
                     // If point delete for data table, then use point delete 
for index as well
                     if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(ref.getFamily(), 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumn(columnReference.getFamily(), 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
                     } else {
-                        delete.deleteColumns(ref.getFamily(), 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(columnReference.getFamily(), 
IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
                     }
                 }
             }
@@ -1030,10 +1055,15 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
         coveredColumns = 
Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
             byte[] cf = Bytes.readByteArray(input);
             byte[] cq = Bytes.readByteArray(input);
-            coveredColumns.add(new ColumnReference(cf,cq));
+            ColumnReference ref = new ColumnReference(cf,cq);
+            coveredColumns.add(ref);
+            if(isLocalIndex) {
+                coveredColumnsMap.put(ref, new 
ColumnReference(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))),
 cq));
+            }
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 8ad4d3e..9d2955b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -71,7 +71,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns(), 
metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
-            indexUpdate.setTable(maintainer.getIndexTableName());
+            indexUpdate.setTable(maintainer.isLocalIndex() ? 
state.getEnvironment().getRegion()
+                    .getTableDesc().getName() : 
maintainer.getIndexTableName());
             Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, 
ptr, state.getCurrentTimestamp(), env
                     .getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey());
             indexUpdate.setUpdate(put);
@@ -95,7 +96,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns(), 
metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
-            indexUpdate.setTable(maintainer.getIndexTableName());
+            indexUpdate.setTable(maintainer.isLocalIndex() ? 
state.getEnvironment().getRegion()
+                    .getTableDesc().getName() : 
maintainer.getIndexTableName());
             Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, 
valueGetter, ptr, state.getPendingUpdate(),
                     state.getCurrentTimestamp(), 
env.getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey());
             indexUpdate.setUpdate(delete);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 17da04e..d7850ba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -138,7 +138,7 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
             }
 
             // its a local index table, so we need to convert it to the index 
table names we should disable
-            if (MetaDataUtil.isLocalIndex(ref.getTableName())) {
+            if 
(MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) {
                 for (String tableName : getLocalIndexNames(ref, mutations)) {
                     indexTableNames.put(tableName, minTimeStamp);
                 }
@@ -224,8 +224,7 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
         try {
             conn = 
QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap(
                     PhoenixConnection.class);
-            String userTableName = 
MetaDataUtil.getUserTableName(ref.getTableName());
-            PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+            PTable dataTable = PhoenixRuntime.getTable(conn, 
ref.getTableName());
             List<PTable> indexes = dataTable.getIndexes();
             // local index used to get view id from index mutation row key.
             PTable localIndex = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5d8879c..3d8124c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -62,6 +63,7 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
@@ -69,6 +71,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -165,7 +168,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
             // no index updates, so we are done
             if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates);
+                this.writer.write(indexUpdates, true);
             }
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 7f403b0..3e0fd99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -176,7 +176,7 @@ public abstract class ExplainTable {
         }
     }
 
-    private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean 
isNull, int slotIndex) {
+    private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean 
isNull, int slotIndex, boolean changeViewIndexId) {
         if (Boolean.TRUE.equals(isNull)) {
             buf.append("null");
             return;
@@ -198,8 +198,14 @@ public abstract class ExplainTable {
             type.coerceBytes(ptr, type, sortOrder, SortOrder.getDefault());
             range = ptr.get();
         }
-        Format formatter = context.getConnection().getFormatter(type);
-        buf.append(type.toStringLiteral(range, formatter));
+        if (changeViewIndexId) {
+            Short s = (Short) type.toObject(range);
+            s = (short) (s + (-Short.MAX_VALUE));
+            buf.append(s.toString());
+        } else {
+            Format formatter = context.getConnection().getFormatter(type);
+            buf.append(type.toStringLiteral(range, formatter));
+        }
     }
     
     private static class RowKeyValueIterator implements Iterator<byte[]> {
@@ -257,6 +263,7 @@ public abstract class ExplainTable {
                 minMaxIterator = new RowKeyValueIterator(schema, 
minMaxRange.getRange(bound));
             }
         }
+        boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());
         boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN);
         int nRanges = forceSkipScan ? scanRanges.getRanges().size() : 
scanRanges.getBoundSlotCount();
         for (int i = 0, minPos = 0; minPos < nRanges || 
minMaxIterator.hasNext(); i++) {
@@ -275,7 +282,13 @@ public abstract class ExplainTable {
                     minMaxIterator = Iterators.emptyIterator();
                 }
             }
-            appendPKColumnValue(buf, b, isNull, i);
+            if (isLocalIndex
+                    && ((context.getConnection().getTenantId() != null && i == 
1) || (context
+                            .getConnection().getTenantId() == null && i == 
0))) {
+                appendPKColumnValue(buf, b, isNull, i, true);
+            } else {
+                appendPKColumnValue(buf, b, isNull, i, false);
+            }
             buf.append(',');
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 0525de9..41c39a3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -395,13 +395,8 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
         List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
         for(PTable indexTable : table.getIndexes()){
-            if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
-                indexTables.add(new 
TargetTableRef(indexTable.getName().getString(),
-                        
Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getPhysicalName().getBytes()))));
-            } else {
-                indexTables.add(
-                        new TargetTableRef(indexTable.getName().getString(), 
indexTable.getPhysicalName().getString()));
-            }
+            indexTables.add(new 
TargetTableRef(indexTable.getName().getString(), indexTable
+                    .getPhysicalName().getString()));
         }
         return indexTables;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 6743688..c93a58b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -206,8 +206,7 @@ public class IndexTool extends Configured implements Tool {
             // computed from the qDataTable name.
             String physicalIndexTable = 
pindexTable.getPhysicalName().getString();
             if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
-                physicalIndexTable = Bytes
-                        
.toString(MetaDataUtil.getLocalIndexPhysicalName(pdataTable.getPhysicalName().getBytes()));
+                physicalIndexTable = qDataTable;
             }
 
             final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f593dd0..c29c0bf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -850,18 +849,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         null, priority, null);
             }
 
-            if 
(descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
-                    && 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor
-                            
.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-                if 
(!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) 
{
-                    
descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
-                        null, priority, null);
-                }
-            } else {
-                if 
(!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
-                        && !SchemaUtil.isMetaTable(tableName)
-                        && !SchemaUtil.isSequenceTable(tableName)) {
-                    
descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, 
null);
+            Set<byte[]> familiesKeys = descriptor.getFamiliesKeys();
+            for(byte[] family: familiesKeys) {
+                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX))
 {
+                    if 
(!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) 
{
+                        
descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
+                            null, priority, null);
+                        break;
+                    }
                 }
             }
 
@@ -1071,8 +1066,19 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } else {
                 if (isMetaTable) {
                     
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES,
 this.getProps()).getName());
+                } else {
+                    for(Pair<byte[],Map<String,Object>> family: families) {
+                        if 
((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || 
!newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals(
+                            IndexRegionSplitPolicy.class.getName()))
+                                && 
Bytes.toString(family.getFirst()).startsWith(
+                                    
QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                                   
newDesc.setValue(HTableDescriptor.SPLIT_POLICY, 
IndexRegionSplitPolicy.class.getName());
+                                   break;
+                           }
+                    }
                 }
 
+
                 if (!modifyExistingMetaData) {
                     return existingDesc; // Caller already knows that no 
metadata was changed
                 }
@@ -1303,60 +1309,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
-    private void ensureLocalIndexTableCreated(byte[] physicalTableName, 
Map<String, Object> tableProps,
-            List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, 
long timestamp,
-            boolean isNamespaceMapped) throws SQLException {
-        PTable table;
-        String parentTableName = SchemaUtil
-                
.getParentTableNameFromIndexTable(Bytes.toString(physicalTableName),
-                        MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)
-                .replace(QueryConstants.NAMESPACE_SEPARATOR, 
QueryConstants.NAME_SEPARATOR);
-        try {
-            synchronized (latestMetaDataLock) {
-                throwConnectionClosedIfNullMetaData();
-                table = latestMetaData.getTableRef(new 
PTableKey(PName.EMPTY_NAME, parentTableName)).getTable();
-                latestMetaDataLock.notifyAll();
-            }
-            if (table.getTimeStamp() >= timestamp) { // Table in cache is 
newer than client timestamp which shouldn't be the case
-                throw new 
TableNotFoundException(table.getSchemaName().getString(), 
table.getTableName().getString());
-            }
-        } catch (TableNotFoundException e) {
-            byte[] schemaName = 
Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(parentTableName));
-            byte[] tableName = 
Bytes.toBytes(SchemaUtil.getTableNameFromFullName(parentTableName));
-            MetaDataMutationResult result = this.getTable(null, schemaName, 
tableName, HConstants.LATEST_TIMESTAMP, timestamp);
-            table = result.getTable();
-            if (table == null) {
-                throw e;
-            }
-        }
-        ensureLocalIndexTableCreated(physicalTableName, tableProps, families, 
splits, isNamespaceMapped);
-    }
-
-    private void ensureLocalIndexTableCreated(byte[] physicalTableName, 
Map<String, Object> tableProps,
-            List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, 
boolean isNamespaceMapped)
-                    throws SQLException, TableAlreadyExistsException {
-        
-        // If we're not allowing local indexes or the hbase version is too low,
-        // don't create the local index table
-        if (   
!this.getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, 
QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX) 
-            || !this.supportsFeature(Feature.LOCAL_INDEX)) {
-                    return;
-        }
-        
-        tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, 
TRUE_BYTES_AS_STRING);
-        HTableDescriptor desc = ensureTableCreated(physicalTableName, 
PTableType.TABLE, tableProps, families, splits,
-                true, isNamespaceMapped);
-        if (desc != null) {
-            if 
(!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES))))
 {
-                String fullTableName = Bytes.toString(physicalTableName);
-                throw new TableAlreadyExistsException(
-                        "Unable to create shared physical table for local 
indexes.",
-                        SchemaUtil.getSchemaNameFromFullName(fullTableName),
-                        SchemaUtil.getTableNameFromFullName(fullTableName));
-            }
-        }
-    }
-
     private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long 
timestamp) throws SQLException {
         byte[] physicalIndexName = 
MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
         HTableDescriptor desc = null;
@@ -1385,22 +1337,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     }
 
     private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, 
long timestamp) throws SQLException {
-        byte[] physicalIndexName = 
MetaDataUtil.getLocalIndexPhysicalName(physicalTableName);
         HTableDescriptor desc = null;
         boolean wasDeleted = false;
         try (HBaseAdmin admin = getAdmin()) {
             try {
-                desc = admin.getTableDescriptor(physicalIndexName);
-                if 
(Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES))))
 {
-                    this.tableStatsCache.invalidate(new 
ImmutableBytesPtr(physicalIndexName));
-                    final ReadOnlyProps props = this.getProps();
-                    final boolean dropMetadata = 
props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
-                    if (dropMetadata) {
-                        admin.disableTable(physicalIndexName);
-                        admin.deleteTable(physicalIndexName);
-                        clearTableRegionCache(physicalIndexName);
-                        wasDeleted = true;
+                desc = admin.getTableDescriptor(physicalTableName);
+                this.tableStatsCache.invalidate(new 
ImmutableBytesPtr(physicalTableName));
+                final ReadOnlyProps props = this.getProps();
+                final boolean dropMetadata = 
props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+                if (dropMetadata) {
+                    List<String> columnFamiles = new ArrayList<String>();
+                    for(HColumnDescriptor cf : desc.getColumnFamilies()) {
+                        
if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX))
 {
+                            columnFamiles.add(cf.getNameAsString());
+                        }  
                     }
+                    for(String cf: columnFamiles) {
+                        admin.deleteColumn(physicalTableName, cf);
+                    }  
+                    clearTableRegionCache(physicalTableName);
+                    wasDeleted = true;
                 }
             } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
                 // Ignore, as we may never have created a view index table
@@ -1424,9 +1380,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         byte[] schemaBytes = 
rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
         byte[] tableBytes = 
rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
         byte[] tableName = physicalTableName != null ? physicalTableName : 
SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
-        boolean localIndexTable = 
Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
-
-        if ((tableType == PTableType.VIEW && physicalTableName != null) || 
(tableType != PTableType.VIEW && physicalTableName == null)) {
+        boolean localIndexTable = false;
+        for(Pair<byte[], Map<String, Object>> family: families) {
+               
if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX))
 {
+                       localIndexTable = true;
+                       break;
+               }
+        }
+        if ((tableType == PTableType.VIEW && physicalTableName != null) || 
(tableType != PTableType.VIEW && (physicalTableName == null || 
localIndexTable))) {
             // For views this will ensure that metadata already exists
             // For tables and indexes, this will create the metadata if it 
doesn't already exist
             ensureTableCreated(tableName, tableType, tableProps, families, 
splits, true, isNamespaceMapped);
@@ -1436,10 +1397,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // Physical index table created up front for multi tenant
             // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to 
attempt to create it
             if (physicalTableName != null) {
-                if (localIndexTable) {
-                    ensureLocalIndexTableCreated(tableName, tableProps, 
families, splits,
-                            MetaDataUtil.getClientTimeStamp(m), 
isNamespaceMapped);
-                } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
+                if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, 
kvBuilder, ptr)) {
                     ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? 
null : PNameFactory.newName(tenantIdBytes),
                             physicalTableName, 
MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
                 }
@@ -1561,6 +1519,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 dropTables(result.getTableNamesToDelete());
             }
             invalidateTables(result.getTableNamesToDelete());
+            long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
             if (tableType == PTableType.TABLE) {
                 boolean isNamespaceMapped = 
result.getTable().isNamespaceMapped();
                 byte[] physicalName;
@@ -1569,7 +1528,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 } else {
                     physicalName = TableName.valueOf(schemaBytes, 
tableBytes).getName();
                 }
-                long timestamp = 
MetaDataUtil.getClientTimeStamp(tableMetaData);
                 ensureViewIndexTableDropped(physicalName, timestamp);
                 ensureLocalIndexTableDropped(physicalName, timestamp);
                 tableStatsCache.invalidate(new 
ImmutableBytesPtr(physicalName));
@@ -2479,6 +2437,19 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 }
 
                                 if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+                                    Properties props = 
PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+                                    
props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+                                    
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                                    PhoenixConnection conn =
+                                            new 
PhoenixConnection(ConnectionQueryServicesImpl.this,
+                                                    metaConnection.getURL(), 
props, metaConnection
+                                                            
.getMetaDataCache());
+                                    try {
+                                        UpgradeUtil.upgradeLocalIndexes(conn, 
true);
+                                    } finally {
+                                        if (conn != null) conn.close();
+                                    }
+
                                     metaConnection = 
addColumnsIfNotExists(metaConnection,
                                             
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                             
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
@@ -3622,7 +3593,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && 
renewLeaseEnabled;
     }
 
-
     @Override
     public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] 
row) throws SQLException {
        /*

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 4efb708..91c84e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -166,6 +166,7 @@ public interface QueryConstants {
     public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = 
ByteUtil.EMPTY_BYTE_ARRAY;
 
     public static final byte[] TRUE = new byte[] {1};
+    
 
     /**
      * Separator used between variable length keys for a composite key.
@@ -195,6 +196,16 @@ public interface QueryConstants {
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = 
new ImmutableBytesPtr(
             DEFAULT_COLUMN_FAMILY_BYTES);
 
+    public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#";
+    public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = 
Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
+    public static final ImmutableBytesPtr LOCAL_INDEX_COLUMN_FAMILY_PREFIX_PTR 
= new ImmutableBytesPtr(
+        LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES);
+    
+    public static final String DEFAULT_LOCAL_INDEX_COLUMN_FAMILY = 
LOCAL_INDEX_COLUMN_FAMILY_PREFIX + DEFAULT_COLUMN_FAMILY;
+    public static final byte[] DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES = 
Bytes.toBytes(DEFAULT_LOCAL_INDEX_COLUMN_FAMILY);
+    public static final ImmutableBytesPtr 
DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
+               DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
+
     public static final String ALL_FAMILY_PROPERTIES_KEY = "";
     public static final String SYSTEM_TABLE_PK_NAME = "pk";
 

Reply via email to