http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java index 146028e..3353f9e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java @@ -17,677 +17,8 @@ */ package org.apache.phoenix.hbase.index.balancer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * <p>This class is an extension of the load balancer class. - * It allows to co-locate the regions of the user table and the regions of corresponding - * index table if any.</p> - * - * </>roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions. - * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow - * each other based on which ever comes first.</p> - * - * <p>In case of master failover there is a chance that the znodes of the index - * table and actual table are left behind. Then in that scenario we may get randomAssignment for - * either the actual table region first or the index table region first.</p> - * - * <p>In case of balancing by table any table can balance first.</p> - * - */ - -public class IndexLoadBalancer implements LoadBalancer { - - private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class); - - public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE"); - - public static final String INDEX_BALANCER_DELEGATOR = "hbase.index.balancer.delegator.class"; - - private LoadBalancer delegator; - - private MasterServices master; - - private Configuration conf; - - private ClusterStatus clusterStatus; - - private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTimeMillis()); - - Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, TableName>(); - - Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, TableName>(); - - /** - * Maintains colocation information of user regions and corresponding index regions. - */ - private Map<TableName, Map<ImmutableBytesWritable, ServerName>> colocationInfo = - new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, ServerName>>(); - - private Set<TableName> balancedTables = new HashSet<TableName>(); - - private boolean stopped = false; - - @Override - public void initialize() throws HBaseIOException { - Class<? extends LoadBalancer> delegatorKlass = - conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class, - LoadBalancer.class); - this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf); - this.delegator.setClusterStatus(clusterStatus); - this.delegator.setMasterServices(this.master); - this.delegator.initialize(); - try { - populateTablesToColocate(this.master.getTableDescriptors().getAll()); - } catch (IOException e) { - throw new HBaseIOException(e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - } - - @Override - public void onConfigurationChange(Configuration conf) { - setConf(conf); - } - - @Override - public void setClusterStatus(ClusterStatus st) { - this.clusterStatus = st; - } - - public Map<TableName, Map<ImmutableBytesWritable, ServerName>> getColocationInfo() { - return colocationInfo; - } - - @Override - public void setMasterServices(MasterServices masterServices) { - this.master = masterServices; - } - - @Override - public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) - throws HBaseIOException { - synchronized (this.colocationInfo) { - boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false); - List<RegionPlan> regionPlans = null; - - TableName tableName = null; - if (balanceByTable) { - Map<ImmutableBytesWritable, ServerName> tableKeys = null; - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState - .entrySet()) { - ServerName sn = serverVsRegionList.getKey(); - List<HRegionInfo> regionInfos = serverVsRegionList.getValue(); - if (regionInfos.isEmpty()) { - continue; - } - if (!isTableColocated(regionInfos.get(0).getTable())) { - return this.delegator.balanceCluster(clusterState); - } - // Just get the table name from any one of the values in the regioninfo list - if (tableName == null) { - tableName = regionInfos.get(0).getTable(); - tableKeys = this.colocationInfo.get(tableName); - } - // Check and modify the colocation info map based on values of cluster state - // because we - // will - // call balancer only when the cluster is in stable and reliable state. - if (tableKeys != null) { - for (HRegionInfo hri : regionInfos) { - updateServer(tableKeys, sn, hri); - } - } - } - // If user table is already balanced find the index table plans from the user table - // plans - // or vice verca. - TableName mappedTableName = getMappedTableToColocate(tableName); - if (balancedTables.contains(mappedTableName)) { - balancedTables.remove(mappedTableName); - regionPlans = new ArrayList<RegionPlan>(); - return prepareRegionPlansForClusterState(clusterState, regionPlans); - } else { - balancedTables.add(tableName); - regionPlans = this.delegator.balanceCluster(clusterState); - if (regionPlans == null) { - if (LOG.isDebugEnabled()) { - LOG.debug(tableName + " regions already balanced."); - } - return null; - } else { - updateRegionPlans(regionPlans); - return regionPlans; - } - } - - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Seperating user tables and index tables regions of " - + "each region server in the cluster."); - } - Map<ServerName, List<HRegionInfo>> userClusterState = - new HashMap<ServerName, List<HRegionInfo>>(); - Map<ServerName, List<HRegionInfo>> indexClusterState = - new HashMap<ServerName, List<HRegionInfo>>(); - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState - .entrySet()) { - ServerName sn = serverVsRegionList.getKey(); - List<HRegionInfo> regionsInfos = serverVsRegionList.getValue(); - List<HRegionInfo> idxRegionsToBeMoved = new ArrayList<HRegionInfo>(); - List<HRegionInfo> userRegionsToBeMoved = new ArrayList<HRegionInfo>(); - for (HRegionInfo hri : regionsInfos) { - if (hri.isMetaRegion()) { - continue; - } - tableName = hri.getTable(); - // Check and modify the colocation info map based on values of cluster state - // because we - // will - // call balancer only when the cluster is in stable and reliable state. - if (isTableColocated(tableName)) { - // table name may change every time thats why always need to get table - // entries. - Map<ImmutableBytesWritable, ServerName> tableKeys = - this.colocationInfo.get(tableName); - if (tableKeys != null) { - updateServer(tableKeys, sn, hri); - } - } - if (indexTableVsUserTable.containsKey(tableName)) { - idxRegionsToBeMoved.add(hri); - continue; - } - userRegionsToBeMoved.add(hri); - } - // there may be dummy entries here if assignments by table is set - userClusterState.put(sn, userRegionsToBeMoved); - indexClusterState.put(sn, idxRegionsToBeMoved); - } - - regionPlans = this.delegator.balanceCluster(userClusterState); - if (regionPlans == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("User region plan is null."); - } - regionPlans = new ArrayList<RegionPlan>(); - } else { - updateRegionPlans(regionPlans); - } - return prepareRegionPlansForClusterState(indexClusterState, regionPlans); - } - } - } - - private void updateServer(Map<ImmutableBytesWritable, ServerName> tableKeys, ServerName sn, - HRegionInfo hri) { - ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey()); - ServerName existingServer = tableKeys.get(startKey); - if (!sn.equals(existingServer)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There is a mismatch in the existing server name for the region " + hri - + ". Replacing the server " + existingServer + " with " + sn + "."); - } - tableKeys.put(startKey, sn); - } - } - - /** - * Prepare region plans for cluster state - * @param clusterState if balancing is table wise then cluster state contains only indexed or - * index table regions, otherwise it contains all index tables regions. - * @param regionPlans - * @return - */ - private List<RegionPlan> prepareRegionPlansForClusterState( - Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> regionPlans) { - if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>(); - ImmutableBytesWritable startKey = new ImmutableBytesWritable(); - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState.entrySet()) { - List<HRegionInfo> regionInfos = serverVsRegionList.getValue(); - ServerName server = serverVsRegionList.getKey(); - for (HRegionInfo regionInfo : regionInfos) { - if (!isTableColocated(regionInfo.getTable())) continue; - TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable()); - startKey.set(regionInfo.getStartKey()); - ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey); - if (sn.equals(server)) { - continue; - } else { - RegionPlan rp = new RegionPlan(regionInfo, server, sn); - if (LOG.isDebugEnabled()) { - LOG.debug("Selected server " + rp.getDestination() - + " as destination for region " - + regionInfo.getRegionNameAsString() + " from colocation info."); - } - regionOnline(regionInfo, rp.getDestination()); - regionPlans.add(rp); - } - } - } - return regionPlans; - } - - private void updateRegionPlans(List<RegionPlan> regionPlans) { - for (RegionPlan regionPlan : regionPlans) { - HRegionInfo hri = regionPlan.getRegionInfo(); - if (!isTableColocated(hri.getTable())) continue; - if (LOG.isDebugEnabled()) { - LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.'); - } - regionOnline(hri, regionPlan.getDestination()); - } - } - - @Override - public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions, - List<ServerName> servers) throws HBaseIOException { - List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>(); - List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>(); - for (HRegionInfo hri : regions) { - seperateUserAndIndexRegion(hri, userRegions, indexRegions); - } - Map<ServerName, List<HRegionInfo>> bulkPlan = null; - if (!userRegions.isEmpty()) { - bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers); - // This should not happen. - if (null == bulkPlan) { - if (LOG.isDebugEnabled()) { - LOG.debug("No region plans selected for user regions in roundRobinAssignment."); - } - return null; - } - savePlan(bulkPlan); - } - bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); - return bulkPlan; - } - - private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> userRegions, - List<HRegionInfo> indexRegions) { - if (indexTableVsUserTable.containsKey(hri.getTable())) { - indexRegions.add(hri); - return; - } - userRegions.add(hri); - } - - private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan( - List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> bulkPlan, - List<ServerName> servers) throws HBaseIOException { - if (null != indexRegions && !indexRegions.isEmpty()) { - if (null == bulkPlan) { - bulkPlan = new ConcurrentHashMap<ServerName, List<HRegionInfo>>(); - } - for (HRegionInfo hri : indexRegions) { - if (LOG.isDebugEnabled()) { - LOG.debug("Preparing region plan for index region " - + hri.getRegionNameAsString() + '.'); - } - ServerName destServer = getDestServerForIdxRegion(hri); - List<HRegionInfo> destServerRegions = null; - if (destServer == null) destServer = this.randomAssignment(hri, servers); - if (destServer != null) { - destServerRegions = bulkPlan.get(destServer); - if (null == destServerRegions) { - destServerRegions = new ArrayList<HRegionInfo>(); - bulkPlan.put(destServer, destServerRegions); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Server " + destServer + " selected for region " - + hri.getRegionNameAsString() + '.'); - } - destServerRegions.add(hri); - regionOnline(hri, destServer); - } - } - } - return bulkPlan; - } - - private ServerName getDestServerForIdxRegion(HRegionInfo hri) { - // Every time we calculate the table name because in case of master restart the index - // regions - // may be coming for different index tables. - TableName actualTable = getMappedTableToColocate(hri.getTable()); - ImmutableBytesWritable startkey = new ImmutableBytesWritable(hri.getStartKey()); - synchronized (this.colocationInfo) { - - Map<ImmutableBytesWritable, ServerName> tableKeys = colocationInfo.get(actualTable); - if (null == tableKeys) { - // Can this case come - return null; - } - if (tableKeys.containsKey(startkey)) { - // put index region location if corresponding user region found in regionLocation - // map. - ServerName sn = tableKeys.get(startkey); - regionOnline(hri, sn); - return sn; - } - } - return null; - } - - private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) { - synchronized (this.colocationInfo) { - for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Saving user regions' plans for server " + e.getKey() + '.'); - } - for (HRegionInfo hri : e.getValue()) { - if (!isTableColocated(hri.getTable())) continue; - regionOnline(hri, e.getKey()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Saved user regions' plans for server " + e.getKey() + '.'); - } - } - } - } - - @Override - public Map<ServerName, List<HRegionInfo>> retainAssignment( - Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException { - Map<HRegionInfo, ServerName> userRegionsMap = - new ConcurrentHashMap<HRegionInfo, ServerName>(); - List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>(); - for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) { - seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers); - } - Map<ServerName, List<HRegionInfo>> bulkPlan = null; - if (!userRegionsMap.isEmpty()) { - bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers); - if (bulkPlan == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Empty region plan for user regions."); - } - return null; - } - savePlan(bulkPlan); - } - bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); - return bulkPlan; - } - - private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e, - Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> indexRegions, - List<ServerName> servers) { - HRegionInfo hri = e.getKey(); - if (indexTableVsUserTable.containsKey(hri.getTable())) { - indexRegions.add(hri); - return; - } - if (e.getValue() == null) { - userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size()))); - } else { - userRegionsMap.put(hri, e.getValue()); - } - } - - @Override - public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions, - List<ServerName> servers) throws HBaseIOException { - return this.delegator.immediateAssignment(regions, servers); - } - - @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) - throws HBaseIOException { - if (!isTableColocated(regionInfo.getTable())) { - return this.delegator.randomAssignment(regionInfo, servers); - } - ServerName sn = getServerNameFromMap(regionInfo, servers); - if (sn == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.'); - } - sn = getRandomServer(regionInfo, servers); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString() - + " is " + ((sn == null) ? "null" : sn.toString()) + '.'); - } - return sn; - } - - private ServerName getRandomServer(HRegionInfo regionInfo, List<ServerName> servers) - throws HBaseIOException { - ServerName sn = null; - sn = this.delegator.randomAssignment(regionInfo, servers); - if (sn == null) return null; - regionOnline(regionInfo, sn); - return sn; - } - - private ServerName getServerNameFromMap(HRegionInfo regionInfo, List<ServerName> onlineServers) { - TableName tableName = regionInfo.getTable(); - TableName mappedTable = getMappedTableToColocate(regionInfo.getTable()); - ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey()); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> correspondingTableKeys = - this.colocationInfo.get(mappedTable); - Map<ImmutableBytesWritable, ServerName> actualTableKeys = - this.colocationInfo.get(tableName); - - if (null != correspondingTableKeys) { - if (correspondingTableKeys.containsKey(startKey)) { - ServerName previousServer = null; - if (null != actualTableKeys) { - previousServer = actualTableKeys.get(startKey); - } - ServerName sn = correspondingTableKeys.get(startKey); - if (null != previousServer) { - // if servername of index region and user region are same in colocationInfo - // clean - // previous plans and return null - if (previousServer.equals(sn)) { - correspondingTableKeys.remove(startKey); - actualTableKeys.remove(startKey); - if (LOG.isDebugEnabled()) { - LOG - .debug("Both user region plan and corresponding index region plan " - + "in colocation info are same. Hence clearing the plans to select new plan" - + " for the region " - + regionInfo.getRegionNameAsString() + "."); - } - return null; - } - } - if (sn != null && onlineServers.contains(sn)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updating the region plan of the region " - + regionInfo.getRegionNameAsString() + " with server " + sn); - } - regionOnline(regionInfo, sn); - return sn; - } else if (sn != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("The location " + sn + " of region with start key" - + Bytes.toStringBinary(regionInfo.getStartKey()) - + " is not in online. Selecting other region server."); - } - return null; - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No region plans in colocationInfo for table " + mappedTable); - } - } - return null; - } - } - - @Override - public void regionOnline(HRegionInfo regionInfo, ServerName sn) { - TableName tableName = regionInfo.getTable(); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> tabkeKeys = this.colocationInfo.get(tableName); - if (tabkeKeys == null) { - tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, ServerName>(); - this.colocationInfo.put(tableName, tabkeKeys); - } - tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn); - } - } - - public void clearTableRegionPlans(TableName tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("Clearing regions plans from colocationInfo for table " + tableName); - } - synchronized (this.colocationInfo) { - this.colocationInfo.remove(tableName); - } - } - - @Override - public void regionOffline(HRegionInfo regionInfo) { - TableName tableName = regionInfo.getTable(); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> tableKeys = this.colocationInfo.get(tableName); - if (null == tableKeys) { - if (LOG.isDebugEnabled()) { - LOG.debug("No regions of table " + tableName + " in the colocationInfo."); - } - } else { - tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey())); - if (LOG.isDebugEnabled()) { - LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo"); - } - } - } - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override - public void stop(String why) { - LOG.info("Load Balancer stop requested: " + why); - stopped = true; - } - - public void populateTablesToColocate(Map<String, HTableDescriptor> tableDescriptors) { - HTableDescriptor desc = null; - for (Entry<String, HTableDescriptor> entry : tableDescriptors.entrySet()) { - desc = entry.getValue(); - if (desc.getValue(PARENT_TABLE_KEY) != null) { - addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc - .getTableName()); - } - } - } - - /** - * Add tables whose regions to co-locate. - * @param userTable - * @param indexTable - */ - public void addTablesToColocate(TableName userTable, TableName indexTable) { - if (userTable.equals(indexTable)) { - throw new IllegalArgumentException("Tables to colocate should not be same."); - } else if (isTableColocated(userTable)) { - throw new IllegalArgumentException("User table already colocated with table " - + getMappedTableToColocate(userTable)); - } else if (isTableColocated(indexTable)) { - throw new IllegalArgumentException("Index table is already colocated with table " - + getMappedTableToColocate(indexTable)); - } - userTableVsIndexTable.put(userTable, indexTable); - indexTableVsUserTable.put(indexTable, userTable); - } - - /** - * Removes the specified table and corresponding table from co-location. - * @param table - */ - public void removeTablesFromColocation(TableName table) { - TableName other = userTableVsIndexTable.remove(table); - if (other != null) { - indexTableVsUserTable.remove(other); - } else { - other = indexTableVsUserTable.remove(table); - if (other != null) userTableVsIndexTable.remove(other); - } - } - - /** - * Return mapped table to co-locate. - * @param tableName - * @return index table if the specified table is user table or vice versa. - */ - public TableName getMappedTableToColocate(TableName tableName) { - TableName other = userTableVsIndexTable.get(tableName); - return other == null ? indexTableVsUserTable.get(tableName) : other; - } - public boolean isTableColocated(TableName table) { - return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table); - } +public class IndexLoadBalancer extends StochasticLoadBalancer { - /** - * Populates table's region locations into co-location info from master. - * @param table - */ - public void populateRegionLocations(TableName table) { - synchronized (this.colocationInfo) { - if (!isTableColocated(table)) { - throw new IllegalArgumentException("Specified table " + table - + " should be in one of the tables to co-locate."); - } - RegionStates regionStates = this.master.getAssignmentManager().getRegionStates(); - List<HRegionInfo> onlineRegions = regionStates.getRegionsOfTable(table); - for (HRegionInfo hri : onlineRegions) { - regionOnline(hri, regionStates.getRegionServerOfRegion(hri)); - } - Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); - for (RegionState regionState : regionsInTransition.values()) { - if (table.equals(regionState.getRegion().getTable()) - && regionState.getServerName() != null) { - regionOnline(regionState.getRegion(), regionState.getServerName()); - } - } - } - } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java index a014da2..2f83f8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java @@ -17,98 +17,12 @@ */ package org.apache.phoenix.hbase.index.master; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; -import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer; -import org.apache.phoenix.util.MetaDataUtil; /** * Defines of coprocessor hooks(to support secondary indexing) of operations on * {@link org.apache.hadoop.hbase.master.HMaster} process. */ public class IndexMasterObserver extends BaseMasterObserver { - IndexLoadBalancer balancer = null; - - @Override - public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx) - throws IOException { - LoadBalancer loadBalancer = - ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer(); - if (loadBalancer instanceof IndexLoadBalancer) { - balancer = (IndexLoadBalancer) loadBalancer; - } - super.preMasterInitialization(ctx); - } - - @Override - public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - HTableDescriptor desc, HRegionInfo[] regions) throws IOException { - TableName userTableName = null; - if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) { - userTableName = - TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY)); - balancer.addTablesToColocate(userTableName, desc.getTableName()); - } - if (userTableName != null) balancer.populateRegionLocations(userTableName); - super.preCreateTableHandler(ctx, desc, regions); - } - - @Override - public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableName tableName, HTableDescriptor htd) throws IOException { - HTableDescriptor oldDesc = - ctx.getEnvironment().getMasterServices().getTableDescriptors().get(tableName); - if (oldDesc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) == null - && htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) { - TableName userTableName = - TableName.valueOf(htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY)); - balancer.addTablesToColocate(userTableName, htd.getTableName()); - } - super.preModifyTableHandler(ctx, tableName, htd); - } - - @Override - public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region, - ServerName srcServer, ServerName destServer) throws IOException { - if (balancer != null && balancer.isTableColocated(region.getTable())) { - AssignmentManager am = ctx.getEnvironment().getMasterServices().getAssignmentManager(); - RegionStates regionStates = am.getRegionStates(); - String tableName = region.getTable().getNameAsString(); - String correspondingTable = MetaDataUtil.isLocalIndex(region.getTable().getNameAsString()) - ? MetaDataUtil.getUserTableName(tableName) - : Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(tableName.getBytes())); - List<HRegionInfo> regions = - regionStates.getRegionsOfTable(TableName.valueOf(correspondingTable)); - for (HRegionInfo hri : regions) { - if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) == 0 - && destServer != null) { - balancer.regionOnline(hri, destServer); - am.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, destServer)); - am.unassign(hri); - } - } - } - super.postMove(ctx, region, srcServer, destServer); - } - @Override - public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableName tableName) throws IOException { - if (balancer != null && balancer.isTableColocated(tableName)) { - balancer.removeTablesFromColocation(tableName); - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java index d7fef5e..5e3f3ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java @@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable { void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name); - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates) throws IndexWriteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index 30797b2..cbcec3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -128,10 +128,11 @@ public class IndexWriter implements Stoppable { * @param indexUpdates Updates to write * @throws IOException */ - public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates, + boolean allowLocalUpdates) throws IOException { // convert the strings to htableinterfaces to which we can talk and group by TABLE Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates); - writeAndKillYourselfOnFailure(toWrite); + writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates); } /** @@ -139,9 +140,10 @@ public class IndexWriter implements Stoppable { * @param toWrite * @throws IOException */ - public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException { + public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite, + boolean allowLocalUpdates) throws IOException { try { - write(toWrite); + write(toWrite, allowLocalUpdates); if (LOG.isTraceEnabled()) { LOG.trace("Done writing all index updates!\n\t" + toWrite); } @@ -165,21 +167,24 @@ public class IndexWriter implements Stoppable { * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we * stop early depends on the {@link IndexCommitter}. */ - public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException { - write(resolveTableReferences(toWrite)); - } + public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException { + write(resolveTableReferences(toWrite), false); + } + + public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException { + write(resolveTableReferences(toWrite), allowLocalUpdates); + } /** * see {@link #write(Collection)} * @param toWrite * @throws IndexWriteException */ - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) - throws IndexWriteException { - this.writer.write(toWrite); + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates) + throws IndexWriteException { + this.writer.write(toWrite, allowLocalUpdates); } - /** * Convert the passed index updates to {@link HTableInterfaceReference}s. * @param indexUpdates from the index builder http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index 233dc57..0dc11bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -21,8 +21,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -98,7 +100,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } @Override - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException { + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException { /* * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets @@ -116,7 +118,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { // doing a complete copy over of all the index update for each table. final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue()); final HTableInterfaceReference tableReference = entry.getKey(); - final RegionCoprocessorEnvironment env = this.env; + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } /* * Write a batch of index updates to an index table. This operation stops (is cancelable) via two * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. @@ -145,29 +152,14 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } try { - // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary. - // Also, checking the prefix of the table name to determine if this is a local - // index is pretty hacky. If we're going to keep this, we should revisit that - // as well. - try { - if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) { - Region indexRegion = IndexUtil.getIndexRegion(env); - if (indexRegion != null) { - throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]), - HConstants.NO_NONCE, HConstants.NO_NONCE); - return null; - } - } - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); - } - } + if (allowLocalUpdates) { + for (Mutation m : mutations) { + m.setDurability(Durability.SKIP_WAL); + } + } HTableInterface table = factory.getTable(tableReference.get()); throwFailureIfDone(); + int i = 0; table.batch(mutations); } catch (SingleIndexWriteFailureException e) { throw e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java index 14768ac..fec74ca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -110,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { } @Override - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException { + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException { Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size()); List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size()); @@ -121,6 +122,12 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { // track each reference so we can get at it easily later, when determing failures final HTableInterfaceReference tableReference = entry.getKey(); final RegionCoprocessorEnvironment env = this.env; + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } tables.add(tableReference); /* @@ -144,33 +151,16 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { try { // this may have been queued, but there was an abort/stop so we try to early exit throwFailureIfDone(); + if (allowLocalUpdates) { + for (Mutation m : mutations) { + m.setDurability(Durability.SKIP_WAL); + } + } if (LOG.isDebugEnabled()) { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } - try { - // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary. - // Also, checking the prefix of the table name to determine if this is a local - // index is pretty hacky. If we're going to keep this, we should revisit that - // as well. - if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) { - Region indexRegion = IndexUtil.getIndexRegion(env); - if (indexRegion != null) { - throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]), - HConstants.NO_NONCE, HConstants.NO_NONCE); - return Boolean.TRUE; - } - } - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); - } - } - HTableInterface table = factory.getTable(tableReference.get()); throwFailureIfDone(); table.batch(mutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 1725b11..6a316d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -274,6 +274,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; private Set<ColumnReference> coveredColumns; + private Map<ColumnReference, ColumnReference> coveredColumnsMap; // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -363,6 +364,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -430,6 +432,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (PColumn indexColumn : family.getColumns()) { PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes())); + if(isLocalIndex) { + this.coveredColumnsMap.put( + new ColumnReference(column.getFamilyName().getBytes(), column.getName() + .getBytes()), + new ColumnReference(isLocalIndex ? Bytes.toBytes(IndexUtil + .getLocalIndexColumnFamily(column.getFamilyName().getString())) + : column.getFamilyName().getBytes(), column.getName().getBytes())); + } } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); @@ -861,7 +871,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + if(this.isLocalIndex) { + ColumnReference columnReference = this.coveredColumnsMap.get(ref); + put.add(kvBuilder.buildPut(rowKey, columnReference.getFamilyWritable(), cq, ts, value)); + } else { + put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + } } } return put; @@ -949,11 +964,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + if(this.isLocalIndex) { + ref = this.coveredColumnsMap.get(ref); + } delete.deleteFamilyVersion(ref.getFamily(), ts); } delete.deleteFamilyVersion(emptyCF, ts); } else { for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + if(this.isLocalIndex) { + ref = this.coveredColumnsMap.get(ref); + } delete.deleteFamily(ref.getFamily(), ts); } delete.deleteFamily(emptyCF, ts); @@ -971,11 +992,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ColumnReference columnReference = ref; + if(this.isLocalIndex) { + columnReference = this.coveredColumnsMap.get(ref); + } // If point delete for data table, then use point delete for index as well if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumn(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); } else { - delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); } } } @@ -1030,10 +1055,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { byte[] cf = Bytes.readByteArray(input); byte[] cq = Bytes.readByteArray(input); - coveredColumns.add(new ColumnReference(cf,cq)); + ColumnReference ref = new ColumnReference(cf,cq); + coveredColumns.add(ref); + if(isLocalIndex) { + coveredColumnsMap.put(ref, new ColumnReference(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))), cq)); + } } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 8ad4d3e..9d2955b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -71,7 +71,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); + indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() + .getTableDesc().getName() : maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); indexUpdate.setUpdate(put); @@ -95,7 +96,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); + indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() + .getTableDesc().getName() : maintainer.getIndexTableName()); Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); indexUpdate.setUpdate(delete); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 17da04e..d7850ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -138,7 +138,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } // its a local index table, so we need to convert it to the index table names we should disable - if (MetaDataUtil.isLocalIndex(ref.getTableName())) { + if (MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) { for (String tableName : getLocalIndexNames(ref, mutations)) { indexTableNames.put(tableName, minTimeStamp); } @@ -224,8 +224,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { try { conn = QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap( PhoenixConnection.class); - String userTableName = MetaDataUtil.getUserTableName(ref.getTableName()); - PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); + PTable dataTable = PhoenixRuntime.getTable(conn, ref.getTableName()); List<PTable> indexes = dataTable.getIndexes(); // local index used to get view id from index mutation row key. PTable localIndex = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 5d8879c..3d8124c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -62,6 +63,7 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; @@ -69,6 +71,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -165,7 +168,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // no index updates, so we are done if (!indexUpdates.isEmpty()) { - this.writer.write(indexUpdates); + this.writer.write(indexUpdates, true); } } catch (Throwable t) { String msg = "Failed to update index with entries:" + indexUpdates; http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 7f403b0..3e0fd99 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -176,7 +176,7 @@ public abstract class ExplainTable { } } - private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) { + private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex, boolean changeViewIndexId) { if (Boolean.TRUE.equals(isNull)) { buf.append("null"); return; @@ -198,8 +198,14 @@ public abstract class ExplainTable { type.coerceBytes(ptr, type, sortOrder, SortOrder.getDefault()); range = ptr.get(); } - Format formatter = context.getConnection().getFormatter(type); - buf.append(type.toStringLiteral(range, formatter)); + if (changeViewIndexId) { + Short s = (Short) type.toObject(range); + s = (short) (s + (-Short.MAX_VALUE)); + buf.append(s.toString()); + } else { + Format formatter = context.getConnection().getFormatter(type); + buf.append(type.toStringLiteral(range, formatter)); + } } private static class RowKeyValueIterator implements Iterator<byte[]> { @@ -257,6 +263,7 @@ public abstract class ExplainTable { minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound)); } } + boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan()); boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN); int nRanges = forceSkipScan ? scanRanges.getRanges().size() : scanRanges.getBoundSlotCount(); for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) { @@ -275,7 +282,13 @@ public abstract class ExplainTable { minMaxIterator = Iterators.emptyIterator(); } } - appendPKColumnValue(buf, b, isNull, i); + if (isLocalIndex + && ((context.getConnection().getTenantId() != null && i == 1) || (context + .getConnection().getTenantId() == null && i == 0))) { + appendPKColumnValue(buf, b, isNull, i, true); + } else { + appendPKColumnValue(buf, b, isNull, i, false); + } buf.append(','); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index 0525de9..41c39a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -395,13 +395,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>(); for(PTable indexTable : table.getIndexes()){ - if (indexTable.getIndexType() == PTable.IndexType.LOCAL) { - indexTables.add(new TargetTableRef(indexTable.getName().getString(), - Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getPhysicalName().getBytes())))); - } else { - indexTables.add( - new TargetTableRef(indexTable.getName().getString(), indexTable.getPhysicalName().getString())); - } + indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable + .getPhysicalName().getString())); } return indexTables; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 6743688..c93a58b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -206,8 +206,7 @@ public class IndexTool extends Configured implements Tool { // computed from the qDataTable name. String physicalIndexTable = pindexTable.getPhysicalName().getString(); if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { - physicalIndexTable = Bytes - .toString(MetaDataUtil.getLocalIndexPhysicalName(pdataTable.getPhysicalName().getBytes())); + physicalIndexTable = qDataTable; } final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f593dd0..c29c0bf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; -import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -850,18 +849,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement null, priority, null); } - if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null - && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor - .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { - if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) { - descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(), - null, priority, null); - } - } else { - if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName()) - && !SchemaUtil.isMetaTable(tableName) - && !SchemaUtil.isSequenceTable(tableName)) { - descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null); + Set<byte[]> familiesKeys = descriptor.getFamiliesKeys(); + for(byte[] family: familiesKeys) { + if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) { + descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(), + null, priority, null); + break; + } } } @@ -1071,8 +1066,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { if (isMetaTable) { checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); + } else { + for(Pair<byte[],Map<String,Object>> family: families) { + if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals( + IndexRegionSplitPolicy.class.getName())) + && Bytes.toString(family.getFirst()).startsWith( + QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); + break; + } + } } + if (!modifyExistingMetaData) { return existingDesc; // Caller already knows that no metadata was changed } @@ -1303,60 +1309,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps, - List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, long timestamp, - boolean isNamespaceMapped) throws SQLException { - PTable table; - String parentTableName = SchemaUtil - .getParentTableNameFromIndexTable(Bytes.toString(physicalTableName), - MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX) - .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR); - try { - synchronized (latestMetaDataLock) { - throwConnectionClosedIfNullMetaData(); - table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable(); - latestMetaDataLock.notifyAll(); - } - if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case - throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); - } - } catch (TableNotFoundException e) { - byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(parentTableName)); - byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(parentTableName)); - MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp); - table = result.getTable(); - if (table == null) { - throw e; - } - } - ensureLocalIndexTableCreated(physicalTableName, tableProps, families, splits, isNamespaceMapped); - } - - private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps, - List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped) - throws SQLException, TableAlreadyExistsException { - - // If we're not allowing local indexes or the hbase version is too low, - // don't create the local index table - if ( !this.getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX) - || !this.supportsFeature(Feature.LOCAL_INDEX)) { - return; - } - - tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); - HTableDescriptor desc = ensureTableCreated(physicalTableName, PTableType.TABLE, tableProps, families, splits, - true, isNamespaceMapped); - if (desc != null) { - if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { - String fullTableName = Bytes.toString(physicalTableName); - throw new TableAlreadyExistsException( - "Unable to create shared physical table for local indexes.", - SchemaUtil.getSchemaNameFromFullName(fullTableName), - SchemaUtil.getTableNameFromFullName(fullTableName)); - } - } - } - private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException { byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName); HTableDescriptor desc = null; @@ -1385,22 +1337,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException { - byte[] physicalIndexName = MetaDataUtil.getLocalIndexPhysicalName(physicalTableName); HTableDescriptor desc = null; boolean wasDeleted = false; try (HBaseAdmin admin = getAdmin()) { try { - desc = admin.getTableDescriptor(physicalIndexName); - if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { - this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName)); - final ReadOnlyProps props = this.getProps(); - final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); - if (dropMetadata) { - admin.disableTable(physicalIndexName); - admin.deleteTable(physicalIndexName); - clearTableRegionCache(physicalIndexName); - wasDeleted = true; + desc = admin.getTableDescriptor(physicalTableName); + this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalTableName)); + final ReadOnlyProps props = this.getProps(); + final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); + if (dropMetadata) { + List<String> columnFamiles = new ArrayList<String>(); + for(HColumnDescriptor cf : desc.getColumnFamilies()) { + if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + columnFamiles.add(cf.getNameAsString()); + } } + for(String cf: columnFamiles) { + admin.deleteColumn(physicalTableName, cf); + } + clearTableRegionCache(physicalTableName); + wasDeleted = true; } } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { // Ignore, as we may never have created a view index table @@ -1424,9 +1380,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes); - boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME)); - - if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) { + boolean localIndexTable = false; + for(Pair<byte[], Map<String, Object>> family: families) { + if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + localIndexTable = true; + break; + } + } + if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) { // For views this will ensure that metadata already exists // For tables and indexes, this will create the metadata if it doesn't already exist ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped); @@ -1436,10 +1397,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Physical index table created up front for multi tenant // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it if (physicalTableName != null) { - if (localIndexTable) { - ensureLocalIndexTableCreated(tableName, tableProps, families, splits, - MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped); - } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { + if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) { ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes), physicalTableName, MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped); } @@ -1561,6 +1519,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement dropTables(result.getTableNamesToDelete()); } invalidateTables(result.getTableNamesToDelete()); + long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData); if (tableType == PTableType.TABLE) { boolean isNamespaceMapped = result.getTable().isNamespaceMapped(); byte[] physicalName; @@ -1569,7 +1528,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { physicalName = TableName.valueOf(schemaBytes, tableBytes).getName(); } - long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData); ensureViewIndexTableDropped(physicalName, timestamp); ensureLocalIndexTableDropped(physicalName, timestamp); tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName)); @@ -2479,6 +2437,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { + Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); + props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + PhoenixConnection conn = + new PhoenixConnection(ConnectionQueryServicesImpl.this, + metaConnection.getURL(), props, metaConnection + .getMetaDataCache()); + try { + UpgradeUtil.upgradeLocalIndexes(conn, true); + } finally { + if (conn != null) conn.close(); + } + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, @@ -3622,7 +3593,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled; } - @Override public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException { /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 4efb708..91c84e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -166,6 +166,7 @@ public interface QueryConstants { public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY; public static final byte[] TRUE = new byte[] {1}; + /** * Separator used between variable length keys for a composite key. @@ -195,6 +196,16 @@ public interface QueryConstants { public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr( DEFAULT_COLUMN_FAMILY_BYTES); + public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#"; + public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX); + public static final ImmutableBytesPtr LOCAL_INDEX_COLUMN_FAMILY_PREFIX_PTR = new ImmutableBytesPtr( + LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES); + + public static final String DEFAULT_LOCAL_INDEX_COLUMN_FAMILY = LOCAL_INDEX_COLUMN_FAMILY_PREFIX + DEFAULT_COLUMN_FAMILY; + public static final byte[] DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_LOCAL_INDEX_COLUMN_FAMILY); + public static final ImmutableBytesPtr DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr( + DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES); + public static final String ALL_FAMILY_PROPERTIES_KEY = ""; public static final String SYSTEM_TABLE_PK_NAME = "pk";
