This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 533c84d HBASE-25739 TableSkewCostFunction need to use aggregated
deviation (#3067)
533c84d is described below
commit 533c84d330f7314cca4f2f14b4cf6b8454eafb0a
Author: clarax <[email protected]>
AuthorDate: Thu Apr 15 13:12:07 2021 -0700
HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3067)
Signed-off-by: Michael Stack <[email protected]>
Reviewed-by: David Manning <[email protected]>
---
.../hbase/master/balancer/BaseLoadBalancer.java | 120 +++++++++++++++++----
.../master/balancer/StochasticLoadBalancer.java | 101 ++++++-----------
.../master/balancer/TestBaseLoadBalancer.java | 4 +-
3 files changed, 131 insertions(+), 94 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 1466570..b411636 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -162,7 +162,12 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex
(initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> #
regions
- int[] numMaxRegionsPerTable; //tableIndex -> max number of regions
in a single RS
+ int[] numRegionsPerTable; // tableIndex -> region count
+ double[] meanRegionsPerTable; // mean region count per table
+ double regionSkewByTable; // skew on RS per by table
+ double minRegionSkewByTable; // min skew on RS per by table
+ double maxRegionSkewByTable; // max skew on RS per by table
+
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the
primary
boolean hasRegionReplicas = false; //whether there is regions with
replicas
@@ -370,6 +375,7 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
+ numRegionsPerTable = new int[numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
@@ -377,21 +383,29 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
}
}
+ for (int i = 0; i < numTables; i++) {
+ numRegionsPerTable[i] = 0;
+ }
+
for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+ numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}
- numMaxRegionsPerTable = new int[numTables];
- for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
- for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length;
tableIndex++) {
- if (aNumRegionsPerServerPerTable[tableIndex] >
numMaxRegionsPerTable[tableIndex]) {
- numMaxRegionsPerTable[tableIndex] =
aNumRegionsPerServerPerTable[tableIndex];
- }
- }
+ // Avoid repeated computation for planning
+ meanRegionsPerTable = new double[numTables];
+ maxRegionSkewByTable = 0;
+ minRegionSkewByTable = 0;
+ for (int i = 0; i < numTables; i++) {
+ meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) /
numServers;
+ minRegionSkewByTable += Cluster.getMinSkew(numRegionsPerTable[i],
numServers);
+ maxRegionSkewByTable += Cluster.getMaxSkew(numRegionsPerTable[i],
numServers);
}
+ computeRegionSkewPerTable();
+
for (int i = 0; i < regions.length; i ++) {
RegionInfo info = regions[i];
if (RegionReplicaUtil.isDefaultReplica(info)) {
@@ -517,6 +531,53 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
}
/**
+ * Return the min skew of distribution
+ */
+ public static double getMinSkew(double total, double numServers) {
+ double mean = total / numServers;
+ // It's possible that there aren't enough regions to go around
+ double min;
+ if (numServers > total) {
+ min = ((numServers - total) * mean + (1 - mean) * total) ;
+ } else {
+ // Some will have 1 more than everything else.
+ int numHigh = (int) (total - (Math.floor(mean) * numServers));
+ int numLow = (int) (numServers - numHigh);
+ min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean -
Math.floor(mean));
+ }
+ return min;
+ }
+
+ /**
+ * Return the max deviation of distribution
+ * Compute max as if all region servers had 0 and one had the sum of all
costs. This must be
+ * a zero sum cost for this to make sense.
+ */
+ public static double getMaxSkew(double total, double numServers) {
+ double mean = total / numServers;
+ return (total - mean) + (numServers - 1) * mean;
+ }
+
+ /**
+ * Scale the value between 0 and 1.
+ *
+ * @param min Min value
+ * @param max The Max value
+ * @param value The value to be scaled.
+ * @return The scaled value.
+ */
+ public static double scale(double min, double max, double value) {
+ if (max <= min || value <= min) {
+ return 0;
+ }
+ if ((max - min) == 0) {
+ return 0;
+ }
+
+ return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
+ }
+
+ /**
* Retrieves and lazily initializes a field storing the locality of
* every region/server combination
*/
@@ -574,6 +635,21 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
}
/**
+ * Recompute the region skew during init or plan of moves.
+ */
+ private void computeRegionSkewPerTable() {
+ // reinitialize for recomputation
+ regionSkewByTable = 0;
+
+ for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
+ for (int tableIndex = 0; tableIndex <
aNumRegionsPerServerPerTable.length; tableIndex++) {
+ regionSkewByTable +=
Math.abs(aNumRegionsPerServerPerTable[tableIndex]
+ - meanRegionsPerTable[tableIndex]);
+ }
+ }
+ }
+
+ /**
* Computes and caches the locality for each region/rack combinations,
* as well as storing a mapping of region -> server and region -> rack
such that server
* and rack have the highest locality for region
@@ -828,22 +904,20 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
+ // update regionSkewPerTable for the move from old server
+ regionSkewByTable +=
+ Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex]
+ - meanRegionsPerTable[tableIndex])
+ - Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + 1
+ - meanRegionsPerTable[tableIndex]);
}
numRegionsPerServerPerTable[newServer][tableIndex]++;
-
- //check whether this caused maxRegionsPerTable in the new Server to be
updated
- if (numRegionsPerServerPerTable[newServer][tableIndex] >
numMaxRegionsPerTable[tableIndex]) {
- numMaxRegionsPerTable[tableIndex] =
numRegionsPerServerPerTable[newServer][tableIndex];
- } else if (oldServer >= 0 &&
(numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
- == numMaxRegionsPerTable[tableIndex]) {
- //recompute maxRegionsPerTable since the previous value was coming
from the old server
- numMaxRegionsPerTable[tableIndex] = 0;
- for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable)
{
- if (aNumRegionsPerServerPerTable[tableIndex] >
numMaxRegionsPerTable[tableIndex]) {
- numMaxRegionsPerTable[tableIndex] =
aNumRegionsPerServerPerTable[tableIndex];
- }
- }
- }
+ // update regionSkewPerTable for the move to new server
+ regionSkewByTable +=
+ Math.abs(numRegionsPerServerPerTable[newServer][tableIndex]
+ - meanRegionsPerTable[tableIndex])
+ - Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] - 1
+ - meanRegionsPerTable[tableIndex]);
// update for servers
int primary = regionIndexToPrimaryIndex[region];
@@ -1013,7 +1087,7 @@ public abstract class BaseLoadBalancer implements
LoadBalancer {
.append(Arrays.toString(serverIndicesSortedByRegionCount))
.append(",
regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
- desc.append(",
numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
+ desc.append(", regionSkewByTable=").append(regionSkewByTable)
.append(", numRegions=").append(numRegions).append(",
numServers=").append(numServers)
.append(", numTables=").append(numTables).append(",
numMovedRegions=")
.append(numMovedRegions).append('}');
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index b40779a..e8a7788 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -762,6 +762,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
boolean isNeeded() {
return true;
}
+
float getMultiplier() {
return multiplier;
}
@@ -770,35 +771,39 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
this.multiplier = m;
}
- /** Called once per LB invocation to give the cost function
+ /**
+ * Called once per LB invocation to give the cost function
* to initialize it's state, and perform any costly calculation.
*/
void init(Cluster cluster) {
this.cluster = cluster;
}
- /** Called once per cluster Action to give the cost function
+ /**
+ * Called once per cluster Action to give the cost function
* an opportunity to update it's state. postAction() is always
* called at least once before cost() is called with the cluster
- * that this action is performed on. */
+ * that this action is performed on.
+ */
void postAction(Action action) {
switch (action.type) {
- case NULL: break;
- case ASSIGN_REGION:
- AssignRegionAction ar = (AssignRegionAction) action;
- regionMoved(ar.region, -1, ar.server);
- break;
- case MOVE_REGION:
- MoveRegionAction mra = (MoveRegionAction) action;
- regionMoved(mra.region, mra.fromServer, mra.toServer);
- break;
- case SWAP_REGIONS:
- SwapRegionsAction a = (SwapRegionsAction) action;
- regionMoved(a.fromRegion, a.fromServer, a.toServer);
- regionMoved(a.toRegion, a.toServer, a.fromServer);
- break;
- default:
- throw new RuntimeException("Uknown action:" + action.type);
+ case NULL:
+ break;
+ case ASSIGN_REGION:
+ AssignRegionAction ar = (AssignRegionAction) action;
+ regionMoved(ar.region, -1, ar.server);
+ break;
+ case MOVE_REGION:
+ MoveRegionAction mra = (MoveRegionAction) action;
+ regionMoved(mra.region, mra.fromServer, mra.toServer);
+ break;
+ case SWAP_REGIONS:
+ SwapRegionsAction a = (SwapRegionsAction) action;
+ regionMoved(a.fromRegion, a.fromServer, a.toServer);
+ regionMoved(a.toRegion, a.toServer, a.fromServer);
+ break;
+ default:
+ throw new RuntimeException("Uknown action:" + action.type);
}
}
@@ -822,59 +827,25 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
double total = getSum(stats);
double count = stats.length;
- double mean = total/count;
-
- // Compute max as if all region servers had 0 and one had the sum of all
costs. This must be
- // a zero sum cost for this to make sense.
- double max = ((count - 1) * mean) + (total - mean);
-
- // It's possible that there aren't enough regions to go around
- double min;
- if (count > total) {
- min = ((count - total) * mean) + ((1 - mean) * total);
- } else {
- // Some will have 1 more than everything else.
- int numHigh = (int) (total - (Math.floor(mean) * count));
- int numLow = (int) (count - numHigh);
-
- min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean -
Math.floor(mean)));
+ double mean = total / count;
- }
- min = Math.max(0, min);
- for (int i=0; i<stats.length; i++) {
+ for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
- double scaled = scale(min, max, totalCost);
- return scaled;
+ return Cluster
+ .scale(Cluster.getMinSkew(total, count), Cluster.getMaxSkew(total,
count), totalCost);
}
private double getSum(double[] stats) {
double total = 0;
- for(double s:stats) {
+ for (double s : stats) {
total += s;
}
return total;
}
-
- /**
- * Scale the value between 0 and 1.
- *
- * @param min Min value
- * @param max The Max value
- * @param value The value to be scaled.
- * @return The scaled value.
- */
- protected double scale(double min, double max, double value) {
- if (max <= min || value <= min) {
- return 0;
- }
- if ((max - min) == 0) return 0;
-
- return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
- }
}
/**
@@ -927,7 +898,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
return 1000000; // return a number much greater than any of the
other cost
}
- return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
+ return Cluster.scale(0, Math.min(cluster.numRegions, maxMoves),
moveCost);
}
}
@@ -1035,15 +1006,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
@Override
protected double cost() {
- double max = cluster.numRegions;
- double min = ((double) cluster.numRegions) / cluster.numServers;
- double value = 0;
-
- for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
- value += cluster.numMaxRegionsPerTable[i];
- }
-
- return scale(min, max, value);
+ return Cluster.scale(cluster.minRegionSkewByTable,
cluster.maxRegionSkewByTable, cluster.regionSkewByTable);
}
}
@@ -1366,7 +1329,7 @@ public class StochasticLoadBalancer extends
BaseLoadBalancer {
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
- return scale(0, maxCost, totalCost);
+ return Cluster.scale(0, maxCost, totalCost);
}
/**
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index fdac676..f82a96a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -390,8 +390,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
- // check that the numMaxRegionsPerTable for "table" has increased to 2
- assertEquals(2, cluster.numMaxRegionsPerTable[0]);
+ // check that the regionSkewByTable for "table" has increased to 2
+ assertEquals(2, cluster.regionSkewByTable, 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));