http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java index 146028e..3353f9e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java @@ -17,677 +17,8 @@ */ package org.apache.phoenix.hbase.index.balancer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * <p>This class is an extension of the load balancer class. - * It allows to co-locate the regions of the user table and the regions of corresponding - * index table if any.</p> - * - * </>roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions. - * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow - * each other based on which ever comes first.</p> - * - * <p>In case of master failover there is a chance that the znodes of the index - * table and actual table are left behind. Then in that scenario we may get randomAssignment for - * either the actual table region first or the index table region first.</p> - * - * <p>In case of balancing by table any table can balance first.</p> - * - */ - -public class IndexLoadBalancer implements LoadBalancer { - - private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class); - - public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE"); - - public static final String INDEX_BALANCER_DELEGATOR = "hbase.index.balancer.delegator.class"; - - private LoadBalancer delegator; - - private MasterServices master; - - private Configuration conf; - - private ClusterStatus clusterStatus; - - private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTimeMillis()); - - Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, TableName>(); - - Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, TableName>(); - - /** - * Maintains colocation information of user regions and corresponding index regions. - */ - private Map<TableName, Map<ImmutableBytesWritable, ServerName>> colocationInfo = - new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, ServerName>>(); - - private Set<TableName> balancedTables = new HashSet<TableName>(); - - private boolean stopped = false; - - @Override - public void initialize() throws HBaseIOException { - Class<? extends LoadBalancer> delegatorKlass = - conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class, - LoadBalancer.class); - this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf); - this.delegator.setClusterStatus(clusterStatus); - this.delegator.setMasterServices(this.master); - this.delegator.initialize(); - try { - populateTablesToColocate(this.master.getTableDescriptors().getAll()); - } catch (IOException e) { - throw new HBaseIOException(e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - } - - @Override - public void onConfigurationChange(Configuration conf) { - setConf(conf); - } - - @Override - public void setClusterStatus(ClusterStatus st) { - this.clusterStatus = st; - } - - public Map<TableName, Map<ImmutableBytesWritable, ServerName>> getColocationInfo() { - return colocationInfo; - } - - @Override - public void setMasterServices(MasterServices masterServices) { - this.master = masterServices; - } - - @Override - public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) - throws HBaseIOException { - synchronized (this.colocationInfo) { - boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false); - List<RegionPlan> regionPlans = null; - - TableName tableName = null; - if (balanceByTable) { - Map<ImmutableBytesWritable, ServerName> tableKeys = null; - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState - .entrySet()) { - ServerName sn = serverVsRegionList.getKey(); - List<HRegionInfo> regionInfos = serverVsRegionList.getValue(); - if (regionInfos.isEmpty()) { - continue; - } - if (!isTableColocated(regionInfos.get(0).getTable())) { - return this.delegator.balanceCluster(clusterState); - } - // Just get the table name from any one of the values in the regioninfo list - if (tableName == null) { - tableName = regionInfos.get(0).getTable(); - tableKeys = this.colocationInfo.get(tableName); - } - // Check and modify the colocation info map based on values of cluster state - // because we - // will - // call balancer only when the cluster is in stable and reliable state. - if (tableKeys != null) { - for (HRegionInfo hri : regionInfos) { - updateServer(tableKeys, sn, hri); - } - } - } - // If user table is already balanced find the index table plans from the user table - // plans - // or vice verca. - TableName mappedTableName = getMappedTableToColocate(tableName); - if (balancedTables.contains(mappedTableName)) { - balancedTables.remove(mappedTableName); - regionPlans = new ArrayList<RegionPlan>(); - return prepareRegionPlansForClusterState(clusterState, regionPlans); - } else { - balancedTables.add(tableName); - regionPlans = this.delegator.balanceCluster(clusterState); - if (regionPlans == null) { - if (LOG.isDebugEnabled()) { - LOG.debug(tableName + " regions already balanced."); - } - return null; - } else { - updateRegionPlans(regionPlans); - return regionPlans; - } - } - - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Seperating user tables and index tables regions of " - + "each region server in the cluster."); - } - Map<ServerName, List<HRegionInfo>> userClusterState = - new HashMap<ServerName, List<HRegionInfo>>(); - Map<ServerName, List<HRegionInfo>> indexClusterState = - new HashMap<ServerName, List<HRegionInfo>>(); - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState - .entrySet()) { - ServerName sn = serverVsRegionList.getKey(); - List<HRegionInfo> regionsInfos = serverVsRegionList.getValue(); - List<HRegionInfo> idxRegionsToBeMoved = new ArrayList<HRegionInfo>(); - List<HRegionInfo> userRegionsToBeMoved = new ArrayList<HRegionInfo>(); - for (HRegionInfo hri : regionsInfos) { - if (hri.isMetaRegion()) { - continue; - } - tableName = hri.getTable(); - // Check and modify the colocation info map based on values of cluster state - // because we - // will - // call balancer only when the cluster is in stable and reliable state. - if (isTableColocated(tableName)) { - // table name may change every time thats why always need to get table - // entries. - Map<ImmutableBytesWritable, ServerName> tableKeys = - this.colocationInfo.get(tableName); - if (tableKeys != null) { - updateServer(tableKeys, sn, hri); - } - } - if (indexTableVsUserTable.containsKey(tableName)) { - idxRegionsToBeMoved.add(hri); - continue; - } - userRegionsToBeMoved.add(hri); - } - // there may be dummy entries here if assignments by table is set - userClusterState.put(sn, userRegionsToBeMoved); - indexClusterState.put(sn, idxRegionsToBeMoved); - } - - regionPlans = this.delegator.balanceCluster(userClusterState); - if (regionPlans == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("User region plan is null."); - } - regionPlans = new ArrayList<RegionPlan>(); - } else { - updateRegionPlans(regionPlans); - } - return prepareRegionPlansForClusterState(indexClusterState, regionPlans); - } - } - } - - private void updateServer(Map<ImmutableBytesWritable, ServerName> tableKeys, ServerName sn, - HRegionInfo hri) { - ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey()); - ServerName existingServer = tableKeys.get(startKey); - if (!sn.equals(existingServer)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There is a mismatch in the existing server name for the region " + hri - + ". Replacing the server " + existingServer + " with " + sn + "."); - } - tableKeys.put(startKey, sn); - } - } - - /** - * Prepare region plans for cluster state - * @param clusterState if balancing is table wise then cluster state contains only indexed or - * index table regions, otherwise it contains all index tables regions. - * @param regionPlans - * @return - */ - private List<RegionPlan> prepareRegionPlansForClusterState( - Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> regionPlans) { - if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>(); - ImmutableBytesWritable startKey = new ImmutableBytesWritable(); - for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState.entrySet()) { - List<HRegionInfo> regionInfos = serverVsRegionList.getValue(); - ServerName server = serverVsRegionList.getKey(); - for (HRegionInfo regionInfo : regionInfos) { - if (!isTableColocated(regionInfo.getTable())) continue; - TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable()); - startKey.set(regionInfo.getStartKey()); - ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey); - if (sn.equals(server)) { - continue; - } else { - RegionPlan rp = new RegionPlan(regionInfo, server, sn); - if (LOG.isDebugEnabled()) { - LOG.debug("Selected server " + rp.getDestination() - + " as destination for region " - + regionInfo.getRegionNameAsString() + " from colocation info."); - } - regionOnline(regionInfo, rp.getDestination()); - regionPlans.add(rp); - } - } - } - return regionPlans; - } - - private void updateRegionPlans(List<RegionPlan> regionPlans) { - for (RegionPlan regionPlan : regionPlans) { - HRegionInfo hri = regionPlan.getRegionInfo(); - if (!isTableColocated(hri.getTable())) continue; - if (LOG.isDebugEnabled()) { - LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.'); - } - regionOnline(hri, regionPlan.getDestination()); - } - } - - @Override - public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions, - List<ServerName> servers) throws HBaseIOException { - List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>(); - List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>(); - for (HRegionInfo hri : regions) { - seperateUserAndIndexRegion(hri, userRegions, indexRegions); - } - Map<ServerName, List<HRegionInfo>> bulkPlan = null; - if (!userRegions.isEmpty()) { - bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers); - // This should not happen. - if (null == bulkPlan) { - if (LOG.isDebugEnabled()) { - LOG.debug("No region plans selected for user regions in roundRobinAssignment."); - } - return null; - } - savePlan(bulkPlan); - } - bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); - return bulkPlan; - } - - private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> userRegions, - List<HRegionInfo> indexRegions) { - if (indexTableVsUserTable.containsKey(hri.getTable())) { - indexRegions.add(hri); - return; - } - userRegions.add(hri); - } - - private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan( - List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> bulkPlan, - List<ServerName> servers) throws HBaseIOException { - if (null != indexRegions && !indexRegions.isEmpty()) { - if (null == bulkPlan) { - bulkPlan = new ConcurrentHashMap<ServerName, List<HRegionInfo>>(); - } - for (HRegionInfo hri : indexRegions) { - if (LOG.isDebugEnabled()) { - LOG.debug("Preparing region plan for index region " - + hri.getRegionNameAsString() + '.'); - } - ServerName destServer = getDestServerForIdxRegion(hri); - List<HRegionInfo> destServerRegions = null; - if (destServer == null) destServer = this.randomAssignment(hri, servers); - if (destServer != null) { - destServerRegions = bulkPlan.get(destServer); - if (null == destServerRegions) { - destServerRegions = new ArrayList<HRegionInfo>(); - bulkPlan.put(destServer, destServerRegions); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Server " + destServer + " selected for region " - + hri.getRegionNameAsString() + '.'); - } - destServerRegions.add(hri); - regionOnline(hri, destServer); - } - } - } - return bulkPlan; - } - - private ServerName getDestServerForIdxRegion(HRegionInfo hri) { - // Every time we calculate the table name because in case of master restart the index - // regions - // may be coming for different index tables. - TableName actualTable = getMappedTableToColocate(hri.getTable()); - ImmutableBytesWritable startkey = new ImmutableBytesWritable(hri.getStartKey()); - synchronized (this.colocationInfo) { - - Map<ImmutableBytesWritable, ServerName> tableKeys = colocationInfo.get(actualTable); - if (null == tableKeys) { - // Can this case come - return null; - } - if (tableKeys.containsKey(startkey)) { - // put index region location if corresponding user region found in regionLocation - // map. - ServerName sn = tableKeys.get(startkey); - regionOnline(hri, sn); - return sn; - } - } - return null; - } - - private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) { - synchronized (this.colocationInfo) { - for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Saving user regions' plans for server " + e.getKey() + '.'); - } - for (HRegionInfo hri : e.getValue()) { - if (!isTableColocated(hri.getTable())) continue; - regionOnline(hri, e.getKey()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Saved user regions' plans for server " + e.getKey() + '.'); - } - } - } - } - - @Override - public Map<ServerName, List<HRegionInfo>> retainAssignment( - Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException { - Map<HRegionInfo, ServerName> userRegionsMap = - new ConcurrentHashMap<HRegionInfo, ServerName>(); - List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>(); - for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) { - seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers); - } - Map<ServerName, List<HRegionInfo>> bulkPlan = null; - if (!userRegionsMap.isEmpty()) { - bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers); - if (bulkPlan == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Empty region plan for user regions."); - } - return null; - } - savePlan(bulkPlan); - } - bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers); - return bulkPlan; - } - - private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e, - Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> indexRegions, - List<ServerName> servers) { - HRegionInfo hri = e.getKey(); - if (indexTableVsUserTable.containsKey(hri.getTable())) { - indexRegions.add(hri); - return; - } - if (e.getValue() == null) { - userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size()))); - } else { - userRegionsMap.put(hri, e.getValue()); - } - } - - @Override - public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions, - List<ServerName> servers) throws HBaseIOException { - return this.delegator.immediateAssignment(regions, servers); - } - - @Override - public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers) - throws HBaseIOException { - if (!isTableColocated(regionInfo.getTable())) { - return this.delegator.randomAssignment(regionInfo, servers); - } - ServerName sn = getServerNameFromMap(regionInfo, servers); - if (sn == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.'); - } - sn = getRandomServer(regionInfo, servers); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString() - + " is " + ((sn == null) ? "null" : sn.toString()) + '.'); - } - return sn; - } - - private ServerName getRandomServer(HRegionInfo regionInfo, List<ServerName> servers) - throws HBaseIOException { - ServerName sn = null; - sn = this.delegator.randomAssignment(regionInfo, servers); - if (sn == null) return null; - regionOnline(regionInfo, sn); - return sn; - } - - private ServerName getServerNameFromMap(HRegionInfo regionInfo, List<ServerName> onlineServers) { - TableName tableName = regionInfo.getTable(); - TableName mappedTable = getMappedTableToColocate(regionInfo.getTable()); - ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey()); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> correspondingTableKeys = - this.colocationInfo.get(mappedTable); - Map<ImmutableBytesWritable, ServerName> actualTableKeys = - this.colocationInfo.get(tableName); - - if (null != correspondingTableKeys) { - if (correspondingTableKeys.containsKey(startKey)) { - ServerName previousServer = null; - if (null != actualTableKeys) { - previousServer = actualTableKeys.get(startKey); - } - ServerName sn = correspondingTableKeys.get(startKey); - if (null != previousServer) { - // if servername of index region and user region are same in colocationInfo - // clean - // previous plans and return null - if (previousServer.equals(sn)) { - correspondingTableKeys.remove(startKey); - actualTableKeys.remove(startKey); - if (LOG.isDebugEnabled()) { - LOG - .debug("Both user region plan and corresponding index region plan " - + "in colocation info are same. Hence clearing the plans to select new plan" - + " for the region " - + regionInfo.getRegionNameAsString() + "."); - } - return null; - } - } - if (sn != null && onlineServers.contains(sn)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Updating the region plan of the region " - + regionInfo.getRegionNameAsString() + " with server " + sn); - } - regionOnline(regionInfo, sn); - return sn; - } else if (sn != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("The location " + sn + " of region with start key" - + Bytes.toStringBinary(regionInfo.getStartKey()) - + " is not in online. Selecting other region server."); - } - return null; - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No region plans in colocationInfo for table " + mappedTable); - } - } - return null; - } - } - - @Override - public void regionOnline(HRegionInfo regionInfo, ServerName sn) { - TableName tableName = regionInfo.getTable(); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> tabkeKeys = this.colocationInfo.get(tableName); - if (tabkeKeys == null) { - tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, ServerName>(); - this.colocationInfo.put(tableName, tabkeKeys); - } - tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn); - } - } - - public void clearTableRegionPlans(TableName tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("Clearing regions plans from colocationInfo for table " + tableName); - } - synchronized (this.colocationInfo) { - this.colocationInfo.remove(tableName); - } - } - - @Override - public void regionOffline(HRegionInfo regionInfo) { - TableName tableName = regionInfo.getTable(); - synchronized (this.colocationInfo) { - Map<ImmutableBytesWritable, ServerName> tableKeys = this.colocationInfo.get(tableName); - if (null == tableKeys) { - if (LOG.isDebugEnabled()) { - LOG.debug("No regions of table " + tableName + " in the colocationInfo."); - } - } else { - tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey())); - if (LOG.isDebugEnabled()) { - LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo"); - } - } - } - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override - public void stop(String why) { - LOG.info("Load Balancer stop requested: " + why); - stopped = true; - } - - public void populateTablesToColocate(Map<String, HTableDescriptor> tableDescriptors) { - HTableDescriptor desc = null; - for (Entry<String, HTableDescriptor> entry : tableDescriptors.entrySet()) { - desc = entry.getValue(); - if (desc.getValue(PARENT_TABLE_KEY) != null) { - addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc - .getTableName()); - } - } - } - - /** - * Add tables whose regions to co-locate. - * @param userTable - * @param indexTable - */ - public void addTablesToColocate(TableName userTable, TableName indexTable) { - if (userTable.equals(indexTable)) { - throw new IllegalArgumentException("Tables to colocate should not be same."); - } else if (isTableColocated(userTable)) { - throw new IllegalArgumentException("User table already colocated with table " - + getMappedTableToColocate(userTable)); - } else if (isTableColocated(indexTable)) { - throw new IllegalArgumentException("Index table is already colocated with table " - + getMappedTableToColocate(indexTable)); - } - userTableVsIndexTable.put(userTable, indexTable); - indexTableVsUserTable.put(indexTable, userTable); - } - - /** - * Removes the specified table and corresponding table from co-location. - * @param table - */ - public void removeTablesFromColocation(TableName table) { - TableName other = userTableVsIndexTable.remove(table); - if (other != null) { - indexTableVsUserTable.remove(other); - } else { - other = indexTableVsUserTable.remove(table); - if (other != null) userTableVsIndexTable.remove(other); - } - } - - /** - * Return mapped table to co-locate. - * @param tableName - * @return index table if the specified table is user table or vice versa. - */ - public TableName getMappedTableToColocate(TableName tableName) { - TableName other = userTableVsIndexTable.get(tableName); - return other == null ? indexTableVsUserTable.get(tableName) : other; - } - public boolean isTableColocated(TableName table) { - return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table); - } +public class IndexLoadBalancer extends StochasticLoadBalancer { - /** - * Populates table's region locations into co-location info from master. - * @param table - */ - public void populateRegionLocations(TableName table) { - synchronized (this.colocationInfo) { - if (!isTableColocated(table)) { - throw new IllegalArgumentException("Specified table " + table - + " should be in one of the tables to co-locate."); - } - RegionStates regionStates = this.master.getAssignmentManager().getRegionStates(); - List<HRegionInfo> onlineRegions = regionStates.getRegionsOfTable(table); - for (HRegionInfo hri : onlineRegions) { - regionOnline(hri, regionStates.getRegionServerOfRegion(hri)); - } - Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); - for (RegionState regionState : regionsInTransition.values()) { - if (table.equals(regionState.getRegion().getTable()) - && regionState.getServerName() != null) { - regionOnline(regionState.getRegion(), regionState.getServerName()); - } - } - } - } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java index a014da2..2f83f8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java @@ -17,98 +17,12 @@ */ package org.apache.phoenix.hbase.index.master; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; -import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer; -import org.apache.phoenix.util.MetaDataUtil; /** * Defines of coprocessor hooks(to support secondary indexing) of operations on * {@link org.apache.hadoop.hbase.master.HMaster} process. */ public class IndexMasterObserver extends BaseMasterObserver { - IndexLoadBalancer balancer = null; - - @Override - public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx) - throws IOException { - LoadBalancer loadBalancer = - ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer(); - if (loadBalancer instanceof IndexLoadBalancer) { - balancer = (IndexLoadBalancer) loadBalancer; - } - super.preMasterInitialization(ctx); - } - - @Override - public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - HTableDescriptor desc, HRegionInfo[] regions) throws IOException { - TableName userTableName = null; - if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) { - userTableName = - TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY)); - balancer.addTablesToColocate(userTableName, desc.getTableName()); - } - if (userTableName != null) balancer.populateRegionLocations(userTableName); - super.preCreateTableHandler(ctx, desc, regions); - } - - @Override - public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableName tableName, HTableDescriptor htd) throws IOException { - HTableDescriptor oldDesc = - ctx.getEnvironment().getMasterServices().getTableDescriptors().get(tableName); - if (oldDesc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) == null - && htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) { - TableName userTableName = - TableName.valueOf(htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY)); - balancer.addTablesToColocate(userTableName, htd.getTableName()); - } - super.preModifyTableHandler(ctx, tableName, htd); - } - - @Override - public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region, - ServerName srcServer, ServerName destServer) throws IOException { - if (balancer != null && balancer.isTableColocated(region.getTable())) { - AssignmentManager am = ctx.getEnvironment().getMasterServices().getAssignmentManager(); - RegionStates regionStates = am.getRegionStates(); - String tableName = region.getTable().getNameAsString(); - String correspondingTable = MetaDataUtil.isLocalIndex(region.getTable().getNameAsString()) - ? MetaDataUtil.getUserTableName(tableName) - : Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(tableName.getBytes())); - List<HRegionInfo> regions = - regionStates.getRegionsOfTable(TableName.valueOf(correspondingTable)); - for (HRegionInfo hri : regions) { - if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) == 0 - && destServer != null) { - balancer.regionOnline(hri, destServer); - am.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, destServer)); - am.unassign(hri); - } - } - } - super.postMove(ctx, region, srcServer, destServer); - } - @Override - public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx, - TableName tableName) throws IOException { - if (balancer != null && balancer.isTableColocated(tableName)) { - balancer.removeTablesFromColocation(tableName); - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java index d7fef5e..5e3f3ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java @@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable { void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name); - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates) throws IndexWriteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java index 30797b2..cbcec3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java @@ -128,10 +128,11 @@ public class IndexWriter implements Stoppable { * @param indexUpdates Updates to write * @throws IOException */ - public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates, + boolean allowLocalUpdates) throws IOException { // convert the strings to htableinterfaces to which we can talk and group by TABLE Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates); - writeAndKillYourselfOnFailure(toWrite); + writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates); } /** @@ -139,9 +140,10 @@ public class IndexWriter implements Stoppable { * @param toWrite * @throws IOException */ - public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException { + public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite, + boolean allowLocalUpdates) throws IOException { try { - write(toWrite); + write(toWrite, allowLocalUpdates); if (LOG.isTraceEnabled()) { LOG.trace("Done writing all index updates!\n\t" + toWrite); } @@ -165,21 +167,24 @@ public class IndexWriter implements Stoppable { * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we * stop early depends on the {@link IndexCommitter}. */ - public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException { - write(resolveTableReferences(toWrite)); - } + public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException { + write(resolveTableReferences(toWrite), false); + } + + public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException { + write(resolveTableReferences(toWrite), allowLocalUpdates); + } /** * see {@link #write(Collection)} * @param toWrite * @throws IndexWriteException */ - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) - throws IndexWriteException { - this.writer.write(toWrite); + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates) + throws IndexWriteException { + this.writer.write(toWrite, allowLocalUpdates); } - /** * Convert the passed index updates to {@link HTableInterfaceReference}s. * @param indexUpdates from the index builder http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index e52c216..b954a69 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -21,7 +21,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -97,7 +99,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } @Override - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException { + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException { /* * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets @@ -115,7 +117,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { // doing a complete copy over of all the index update for each table. final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue()); final HTableInterfaceReference tableReference = entry.getKey(); - final RegionCoprocessorEnvironment env = this.env; + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } /* * Write a batch of index updates to an index table. This operation stops (is cancelable) via two * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. @@ -144,24 +151,22 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } try { - // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary. - // Also, checking the prefix of the table name to determine if this is a local - // index is pretty hacky. If we're going to keep this, we should revisit that - // as well. - try { - if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) { - HRegion indexRegion = IndexUtil.getIndexRegion(env); - if (indexRegion != null) { - throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()])); - return null; - } + if (allowLocalUpdates && env != null) { + for (Mutation m : mutations) { + m.setDurability(Durability.SKIP_WAL); } - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); + try { + throwFailureIfDone(); + env.getRegion().batchMutate( + mutations.toArray(new Mutation[mutations.size()]), + HConstants.NO_NONCE, HConstants.NO_NONCE); + return null; + } catch (IOException ignord) { + // when it's failed we fall back to the standard & slow way + if (LOG.isDebugEnabled()) { + LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + + ignord); + } } } HTableInterface table = factory.getTable(tableReference.get()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java index 131b930..1efbef1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -23,7 +23,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -109,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { } @Override - public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException { + public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException { Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size()); List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size()); @@ -120,6 +122,12 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { // track each reference so we can get at it easily later, when determing failures final HTableInterfaceReference tableReference = entry.getKey(); final RegionCoprocessorEnvironment env = this.env; + if (env != null + && !allowLocalUpdates + && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + continue; + } tables.add(tableReference); /* @@ -143,32 +151,27 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { try { // this may have been queued, but there was an abort/stop so we try to early exit throwFailureIfDone(); - if (LOG.isDebugEnabled()) { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } - - try { - // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary. - // Also, checking the prefix of the table name to determine if this is a local - // index is pretty hacky. If we're going to keep this, we should revisit that - // as well. - if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) { - HRegion indexRegion = IndexUtil.getIndexRegion(env); - if (indexRegion != null) { - throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()])); - return Boolean.TRUE; - } + if (allowLocalUpdates && env!=null && tableReference.getTableName().equals( + env.getRegion().getTableDesc().getNameAsString())) { + for (Mutation m : mutations) { + m.setDurability(Durability.SKIP_WAL); } - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); + try { + throwFailureIfDone(); + env.getRegion().batchMutate(mutations.toArray(new Mutation[mutations.size()]), + HConstants.NO_NONCE, HConstants.NO_NONCE); + return Boolean.TRUE; + } catch (IOException ignord) { + // when it's failed we fall back to the standard & slow way + if (LOG.isDebugEnabled()) { + LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + + ignord); + } } } - HTableInterface table = factory.getTable(tableReference.get()); throwFailureIfDone(); table.batch(mutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 1725b11..6a316d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -274,6 +274,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; private Set<ColumnReference> coveredColumns; + private Map<ColumnReference, ColumnReference> coveredColumnsMap; // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -363,6 +364,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -430,6 +432,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (PColumn indexColumn : family.getColumns()) { PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes())); + if(isLocalIndex) { + this.coveredColumnsMap.put( + new ColumnReference(column.getFamilyName().getBytes(), column.getName() + .getBytes()), + new ColumnReference(isLocalIndex ? Bytes.toBytes(IndexUtil + .getLocalIndexColumnFamily(column.getFamilyName().getString())) + : column.getFamilyName().getBytes(), column.getName().getBytes())); + } } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); @@ -861,7 +871,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + if(this.isLocalIndex) { + ColumnReference columnReference = this.coveredColumnsMap.get(ref); + put.add(kvBuilder.buildPut(rowKey, columnReference.getFamilyWritable(), cq, ts, value)); + } else { + put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + } } } return put; @@ -949,11 +964,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + if(this.isLocalIndex) { + ref = this.coveredColumnsMap.get(ref); + } delete.deleteFamilyVersion(ref.getFamily(), ts); } delete.deleteFamilyVersion(emptyCF, ts); } else { for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + if(this.isLocalIndex) { + ref = this.coveredColumnsMap.get(ref); + } delete.deleteFamily(ref.getFamily(), ts); } delete.deleteFamily(emptyCF, ts); @@ -971,11 +992,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ColumnReference columnReference = ref; + if(this.isLocalIndex) { + columnReference = this.coveredColumnsMap.get(ref); + } // If point delete for data table, then use point delete for index as well if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumn(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); } else { - delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); } } } @@ -1030,10 +1055,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { byte[] cf = Bytes.readByteArray(input); byte[] cq = Bytes.readByteArray(input); - coveredColumns.add(new ColumnReference(cf,cq)); + ColumnReference ref = new ColumnReference(cf,cq); + coveredColumns.add(ref); + if(isLocalIndex) { + coveredColumnsMap.put(ref, new ColumnReference(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))), cq)); + } } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 8ad4d3e..9d2955b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -71,7 +71,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); + indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() + .getTableDesc().getName() : maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); indexUpdate.setUpdate(put); @@ -95,7 +96,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); + indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() + .getTableDesc().getName() : maintainer.getIndexTableName()); Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); indexUpdate.setUpdate(delete); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 17da04e..d7850ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -138,7 +138,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } // its a local index table, so we need to convert it to the index table names we should disable - if (MetaDataUtil.isLocalIndex(ref.getTableName())) { + if (MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) { for (String tableName : getLocalIndexNames(ref, mutations)) { indexTableNames.put(tableName, minTimeStamp); } @@ -224,8 +224,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { try { conn = QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap( PhoenixConnection.class); - String userTableName = MetaDataUtil.getUserTableName(ref.getTableName()); - PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); + PTable dataTable = PhoenixRuntime.getTable(conn, ref.getTableName()); List<PTable> indexes = dataTable.getIndexes(); // local index used to get view id from index mutation row key. PTable localIndex = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 5d8879c..c50cb94 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,8 +39,11 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; @@ -62,6 +67,7 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; @@ -69,6 +75,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -95,6 +102,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private PhoenixIndexCodec codec; private IndexWriter writer; private boolean stopped; + private Map<Long, Collection<Pair<Mutation, byte[]>>> localUpdates = + new ConcurrentHashMap<Long, Collection<Pair<Mutation, byte[]>>>(); @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -147,6 +156,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } Map<String,byte[]> updateAttributes = m.getAttributesMap(); + String tableName = c.getEnvironment().getRegion().getTableDesc().getNameAsString(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; @@ -159,13 +169,25 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // get the index updates for all elements in this batch indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); - + Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator(); + List<Pair<Mutation, byte[]>> localIndexUpdates = new ArrayList<Pair<Mutation, byte[]>>(indexUpdates.size()); + while(indexUpdatesItr.hasNext()) { + Pair<Mutation, byte[]> next = indexUpdatesItr.next(); + if(tableName.equals(Bytes.toString(next.getSecond()))) { + localIndexUpdates.add(next); + indexUpdatesItr.remove(); + } + } + if(!localIndexUpdates.isEmpty()) { + byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); + localUpdates.put(Bytes.toLong(bs), localIndexUpdates); + } current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); // no index updates, so we are done if (!indexUpdates.isEmpty()) { - this.writer.write(indexUpdates); + this.writer.write(indexUpdates, false); } } catch (Throwable t) { String msg = "Failed to update index with entries:" + indexUpdates; @@ -174,6 +196,34 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } + @Override + public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, + Durability durability) throws IOException { + Map<String,byte[]> updateAttributes = put.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); + byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); + if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) { + super.prePut(e, put, edit, durability); + } else { + Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs)); + this.writer.write(localIndexUpdates, true); + } + } + + @Override + public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, + WALEdit edit, Durability durability) throws IOException { + Map<String,byte[]> updateAttributes = delete.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes); + byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); + if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) { + super.postDelete(e, delete, edit, durability); + } else { + Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs)); + this.writer.write(localIndexUpdates, true); + } + } + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { ResultScanner currentScanner = null; TransactionAwareHTable txTable = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 6419e64..5843040 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -183,7 +183,7 @@ public abstract class ExplainTable { } } - private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) { + private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex, boolean changeViewIndexId) { if (Boolean.TRUE.equals(isNull)) { buf.append("null"); return; @@ -205,8 +205,14 @@ public abstract class ExplainTable { type.coerceBytes(ptr, type, sortOrder, SortOrder.getDefault()); range = ptr.get(); } - Format formatter = context.getConnection().getFormatter(type); - buf.append(type.toStringLiteral(range, formatter)); + if (changeViewIndexId) { + Short s = (Short) type.toObject(range); + s = (short) (s + (-Short.MAX_VALUE)); + buf.append(s.toString()); + } else { + Format formatter = context.getConnection().getFormatter(type); + buf.append(type.toStringLiteral(range, formatter)); + } } private static class RowKeyValueIterator implements Iterator<byte[]> { @@ -264,6 +270,7 @@ public abstract class ExplainTable { minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound)); } } + boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan()); boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN); int nRanges = forceSkipScan ? scanRanges.getRanges().size() : scanRanges.getBoundSlotCount(); for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) { @@ -282,7 +289,13 @@ public abstract class ExplainTable { minMaxIterator = Iterators.emptyIterator(); } } - appendPKColumnValue(buf, b, isNull, i); + if (isLocalIndex + && ((context.getConnection().getTenantId() != null && i == 1) || (context + .getConnection().getTenantId() == null && i == 0))) { + appendPKColumnValue(buf, b, isNull, i, true); + } else { + appendPKColumnValue(buf, b, isNull, i, false); + } buf.append(','); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index faf20db..ad1b691 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -378,13 +378,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>(); for(PTable indexTable : table.getIndexes()){ - if (indexTable.getIndexType() == PTable.IndexType.LOCAL) { - indexTables.add(new TargetTableRef(indexTable.getName().getString(), - Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getPhysicalName().getBytes())))); - } else { - indexTables.add( - new TargetTableRef(indexTable.getName().getString(), indexTable.getPhysicalName().getString())); - } + indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable + .getPhysicalName().getString())); } return indexTables; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6593f1af/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index fc1292a..db8aba9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -206,8 +206,7 @@ public class IndexTool extends Configured implements Tool { // computed from the qDataTable name. String physicalIndexTable = pindexTable.getPhysicalName().getString(); if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { - physicalIndexTable = Bytes - .toString(MetaDataUtil.getLocalIndexPhysicalName(pdataTable.getPhysicalName().getBytes())); + physicalIndexTable = qDataTable; } final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
