HBASE-12731 Heap occupancy based client pushback
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d78b1b9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d78b1b9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d78b1b9 Branch: refs/heads/0.98 Commit: 6d78b1b97a777c655ace9b7e2189747ccc3163de Parents: aea3878 Author: Andrew Purtell <[email protected]> Authored: Thu Jan 22 17:33:21 2015 -0800 Committer: Andrew Purtell <[email protected]> Committed: Thu Jan 22 17:48:47 2015 -0800 ---------------------------------------------------------------------- .../backoff/ExponentialClientBackoffPolicy.java | 41 +++- .../hbase/client/backoff/ServerStatistics.java | 8 +- .../client/TestClientExponentialBackoff.java | 32 +++ .../org/apache/hadoop/hbase/HConstants.java | 7 + .../hbase/protobuf/generated/ClientProtos.java | 200 +++++++++++++++---- hbase-protocol/src/main/protobuf/Client.proto | 7 +- .../hadoop/hbase/regionserver/HRegion.java | 3 +- .../hbase/regionserver/HRegionServer.java | 15 ++ .../hbase/regionserver/HeapMemoryManager.java | 129 ++++++++++++ .../regionserver/RegionServerServices.java | 5 + .../hadoop/hbase/MockRegionServerServices.java | 6 + .../hadoop/hbase/master/MockRegionServer.java | 6 + 12 files changed, 416 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java index 6e75670..5b1d3d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.client.backoff; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.base.Preconditions; + /** * Simple exponential backoff policy on for the client that uses a percent^4 times the * max backoff to generate the backoff time. @@ -38,9 +41,15 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; private long maxBackoff; + private float heapOccupancyLowWatermark; + private float heapOccupancyHighWatermark; public ExponentialClientBackoffPolicy(Configuration conf) { this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY, + HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK); + this.heapOccupancyHighWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_HIGH_WATERMARK_KEY, + HConstants.DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK); } @Override @@ -56,16 +65,40 @@ public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { return 0; } + // Factor in memstore load + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + + // Factor in heap occupancy + float heapOccupancy = regionStats.getHeapOccupancyPercent() / 100.0f; + if (heapOccupancy >= heapOccupancyLowWatermark) { + // If we are higher than the high watermark, we are already applying max + // backoff and cannot scale more (see scale() below) + if (heapOccupancy > heapOccupancyHighWatermark) { + heapOccupancy = heapOccupancyHighWatermark; + } + percent = Math.max(percent, + scale(heapOccupancy, heapOccupancyLowWatermark, heapOccupancyHighWatermark, + 0.1, 1.0)); + } + // square the percent as a value less than 1. Closer we move to 100 percent, // the percent moves to 1, but squaring causes the exponential curve - double percent = regionStats.getMemstoreLoadPercent() / 100.0; double multiplier = Math.pow(percent, 4.0); - // shouldn't ever happen, but just incase something changes in the statistic data if (multiplier > 1) { - LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + - "down to the max backoff"); multiplier = 1; } return (long) (multiplier * maxBackoff); } + + /** Scale valueIn in the range [baseMin,baseMax] to the range [limitMin,limitMax] */ + private static double scale(double valueIn, double baseMin, double baseMax, double limitMin, + double limitMax) { + Preconditions.checkArgument(baseMin <= baseMax, "Illegal source range [%s,%s]", + baseMin, baseMax); + Preconditions.checkArgument(limitMin <= limitMax, "Illegal target range [%s,%s]", + limitMin, limitMax); + Preconditions.checkArgument(valueIn >= baseMin && valueIn <= baseMax, + "Value %s must be within the range [%s,%s]", valueIn, baseMin, baseMax); + return ((limitMax - limitMin) * (valueIn - baseMin) / (baseMax - baseMin)) + limitMin; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java index a3b8e11..c7519be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -54,15 +54,21 @@ public class ServerStatistics { return stats.get(regionName); } - public static class RegionStatistics{ + public static class RegionStatistics { private int memstoreLoad = 0; + private int heapOccupancy = 0; public void update(ClientProtos.RegionLoadStats currentStats) { this.memstoreLoad = currentStats.getMemstoreLoad(); + this.heapOccupancy = currentStats.getHeapOccupancy(); } public int getMemstoreLoadPercent(){ return this.memstoreLoad; } + + public int getHeapOccupancyPercent(){ + return this.heapOccupancy; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java index 24cb661..8c01fa0 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -101,10 +101,42 @@ public class TestClientExponentialBackoff { } } + @Test + public void testHeapOccupancyPolicy() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long backoffTime; + + update(stats, 0, 95); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertTrue("Heap occupancy at low watermark had no effect", backoffTime > 0); + + long previous = backoffTime; + update(stats, 0, 96); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertTrue("Increase above low watermark should have increased backoff", + backoffTime > previous); + + update(stats, 0, 98); + backoffTime = backoff.getBackoffTime(server, regionname, stats); + assertEquals("We should be using max backoff when at high watermark", backoffTime, + ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + } + private void update(ServerStatistics stats, int load) { ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() .setMemstoreLoad (load).build(); stats.update(regionname, stat); } + + private void update(ServerStatistics stats, int memstoreLoad, int heapOccupancy) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad(memstoreLoad) + .setHeapOccupancy(heapOccupancy) + .build(); + stats.update(regionname, stat); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c181ad5..b1508cc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1042,6 +1042,13 @@ public final class HConstants { public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + public static final String HEAP_OCCUPANCY_LOW_WATERMARK_KEY = + "hbase.heap.occupancy.low_water_mark"; + public static final float DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK = 0.95f; + public static final String HEAP_OCCUPANCY_HIGH_WATERMARK_KEY = + "hbase.heap.occupancy.high_water_mark"; + public static final float DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK = 0.98f; + private HConstants() { // Can't be instantiated with this ctor. } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index fc776ea..6765ce6 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -25721,7 +25721,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ boolean hasMemstoreLoad(); @@ -25729,10 +25729,30 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ int getMemstoreLoad(); + + // optional int32 heapOccupancy = 2 [default = 0]; + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + boolean hasHeapOccupancy(); + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + int getHeapOccupancy(); } /** * Protobuf type {@code RegionLoadStats} @@ -25795,6 +25815,11 @@ public final class ClientProtos { memstoreLoad_ = input.readInt32(); break; } + case 16: { + bitField0_ |= 0x00000002; + heapOccupancy_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -25842,7 +25867,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public boolean hasMemstoreLoad() { @@ -25852,15 +25877,42 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public int getMemstoreLoad() { return memstoreLoad_; } + // optional int32 heapOccupancy = 2 [default = 0]; + public static final int HEAPOCCUPANCY_FIELD_NUMBER = 2; + private int heapOccupancy_; + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public boolean hasHeapOccupancy() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public int getHeapOccupancy() { + return heapOccupancy_; + } + private void initFields() { memstoreLoad_ = 0; + heapOccupancy_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -25877,6 +25929,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeInt32(1, memstoreLoad_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(2, heapOccupancy_); + } getUnknownFields().writeTo(output); } @@ -25890,6 +25945,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, memstoreLoad_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(2, heapOccupancy_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -25918,6 +25977,11 @@ public final class ClientProtos { result = result && (getMemstoreLoad() == other.getMemstoreLoad()); } + result = result && (hasHeapOccupancy() == other.hasHeapOccupancy()); + if (hasHeapOccupancy()) { + result = result && (getHeapOccupancy() + == other.getHeapOccupancy()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -25935,6 +25999,10 @@ public final class ClientProtos { hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER; hash = (53 * hash) + getMemstoreLoad(); } + if (hasHeapOccupancy()) { + hash = (37 * hash) + HEAPOCCUPANCY_FIELD_NUMBER; + hash = (53 * hash) + getHeapOccupancy(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -26051,6 +26119,8 @@ public final class ClientProtos { super.clear(); memstoreLoad_ = 0; bitField0_ = (bitField0_ & ~0x00000001); + heapOccupancy_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -26083,6 +26153,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000001; } result.memstoreLoad_ = memstoreLoad_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.heapOccupancy_ = heapOccupancy_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -26102,6 +26176,9 @@ public final class ClientProtos { if (other.hasMemstoreLoad()) { setMemstoreLoad(other.getMemstoreLoad()); } + if (other.hasHeapOccupancy()) { + setHeapOccupancy(other.getHeapOccupancy()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -26135,7 +26212,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public boolean hasMemstoreLoad() { @@ -26145,7 +26222,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public int getMemstoreLoad() { @@ -26155,7 +26232,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public Builder setMemstoreLoad(int value) { @@ -26168,7 +26245,7 @@ public final class ClientProtos { * <code>optional int32 memstoreLoad = 1 [default = 0];</code> * * <pre> - * percent load on the memstore. Guaranteed to be positive, between 0 and 100 + * Percent load on the memstore. Guaranteed to be positive, between 0 and 100. * </pre> */ public Builder clearMemstoreLoad() { @@ -26178,6 +26255,59 @@ public final class ClientProtos { return this; } + // optional int32 heapOccupancy = 2 [default = 0]; + private int heapOccupancy_ ; + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public boolean hasHeapOccupancy() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public int getHeapOccupancy() { + return heapOccupancy_; + } + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public Builder setHeapOccupancy(int value) { + bitField0_ |= 0x00000002; + heapOccupancy_ = value; + onChanged(); + return this; + } + /** + * <code>optional int32 heapOccupancy = 2 [default = 0];</code> + * + * <pre> + * Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + * We can move this to "ServerLoadStats" should we develop them. + * </pre> + */ + public Builder clearHeapOccupancy() { + bitField0_ = (bitField0_ & ~0x00000002); + heapOccupancy_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:RegionLoadStats) } @@ -31422,32 +31552,32 @@ public final class ClientProtos { "(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Coproce", "ssorServiceCall\"Y\n\014RegionAction\022 \n\006regio" + "n\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(" + - "\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017RegionLoad" + - "Stats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021Resul" + - "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + - "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + - "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + - "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." + - "RegionLoadStats\"f\n\022RegionActionResult\022-\n" + - "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep", - "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + - "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." + - "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" + - "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse" + - "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" + - "ionResult\022\021\n\tprocessed\030\002 \001(\0102\205\003\n\rClientS" + - "ervice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse" + - "\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateRespo" + - "nse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse" + - "\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest", - "\032\026.BulkLoadHFileResponse\022F\n\013ExecService\022" + - "\032.CoprocessorServiceRequest\032\033.Coprocesso" + - "rServiceResponse\022R\n\027ExecRegionServerServ" + - "ice\022\032.CoprocessorServiceRequest\032\033.Coproc" + - "essorServiceResponse\022&\n\005Multi\022\r.MultiReq" + - "uest\032\016.MultiResponseBB\n*org.apache.hadoo" + - "p.hbase.protobuf.generatedB\014ClientProtos" + - "H\001\210\001\001\240\001\001" + "\010\022\027\n\006action\030\003 \003(\0132\007.Action\"D\n\017RegionLoad" + + "Stats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOc" + + "cupancy\030\002 \001(\005:\0010\"\266\001\n\021ResultOrException\022\r" + + "\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!" + + "\n\texception\030\003 \001(\0132\016.NameBytesPair\0221\n\016ser" + + "vice_result\030\004 \001(\0132\031.CoprocessorServiceRe" + + "sult\022#\n\tloadStats\030\005 \001(\0132\020.RegionLoadStat" + + "s\"f\n\022RegionActionResult\022-\n\021resultOrExcep", + "tion\030\001 \003(\0132\022.ResultOrException\022!\n\texcept" + + "ion\030\002 \001(\0132\016.NameBytesPair\"f\n\014MultiReques" + + "t\022#\n\014regionAction\030\001 \003(\0132\r.RegionAction\022\022" + + "\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n." + + "Condition\"S\n\rMultiResponse\022/\n\022regionActi" + + "onResult\030\001 \003(\0132\023.RegionActionResult\022\021\n\tp" + + "rocessed\030\002 \001(\0102\205\003\n\rClientService\022 \n\003Get\022" + + "\013.GetRequest\032\014.GetResponse\022)\n\006Mutate\022\016.M" + + "utateRequest\032\017.MutateResponse\022#\n\004Scan\022\014." + + "ScanRequest\032\r.ScanResponse\022>\n\rBulkLoadHF", + "ile\022\025.BulkLoadHFileRequest\032\026.BulkLoadHFi" + + "leResponse\022F\n\013ExecService\022\032.CoprocessorS" + + "erviceRequest\032\033.CoprocessorServiceRespon" + + "se\022R\n\027ExecRegionServerService\022\032.Coproces" + + "sorServiceRequest\032\033.CoprocessorServiceRe" + + "sponse\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRe" + + "sponseBB\n*org.apache.hadoop.hbase.protob" + + "uf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -31609,7 +31739,7 @@ public final class ClientProtos { internal_static_RegionLoadStats_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionLoadStats_descriptor, - new java.lang.String[] { "MemstoreLoad", }); + new java.lang.String[] { "MemstoreLoad", "HeapOccupancy", }); internal_static_ResultOrException_descriptor = getDescriptor().getMessageTypes().get(23); internal_static_ResultOrException_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index a648b1a..5b6a0fd 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -341,9 +341,12 @@ message RegionAction { /* * Statistics about the current load on the region */ -message RegionLoadStats{ - // percent load on the memstore. Guaranteed to be positive, between 0 and 100 +message RegionLoadStats { + // Percent load on the memstore. Guaranteed to be positive, between 0 and 100. optional int32 memstoreLoad = 1 [default = 0]; + // Percent JVM heap occupancy. Guaranteed to be positive, between 0 and 100. + // We can move this to "ServerLoadStats" should we develop them. + optional int32 heapOccupancy = 2 [default = 0]; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ce23111..5dabf8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5061,7 +5061,8 @@ public class HRegion implements HeapSize { // , Writable{ } ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this - .memstoreFlushSize))); + .memstoreFlushSize))); + stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100); return stats.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 532d1eb..08e714d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -299,6 +299,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Cache flushing protected MemStoreFlusher cacheFlusher; + protected HeapMemoryManager hMemManager; + // catalog tracker protected CatalogTracker catalogTracker; @@ -982,6 +984,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already + if (this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); @@ -1323,6 +1326,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); startServiceThreads(); + startHeapMemoryManager(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RpcServer on " + this.isa + ", sessionid=0x" + @@ -1338,6 +1342,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } } + private void startHeapMemoryManager() { + this.hMemManager = HeapMemoryManager.create(this); + if (this.hMemManager != null) { + this.hMemManager.start(); + } + } + private void createMyEphemeralNode() throws KeeperException, IOException { byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, @@ -4924,4 +4935,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return this.cacheConfig; } + @Override + public HeapMemoryManager getHeapMemoryManager() { + return hMemManager; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java new file mode 100644 index 0000000..1960781 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -0,0 +1,129 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Manages heap memory related tasks. + */ [email protected] +public class HeapMemoryManager { + private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class); + + // keep the same period tunable as branch-1 and higher for compatibility + public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD = + "hbase.regionserver.heapmemory.tuner.period"; + public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 60 * 1000; + + private float heapOccupancyPercent; + + private Server server; + private HeapMemoryChore heapMemChore = null; + private final int defaultChorePeriod; + private final float heapOccupancyLowWatermark; + + public static HeapMemoryManager create(Server server) { + return new HeapMemoryManager(server); + } + + @VisibleForTesting + HeapMemoryManager(Server server) { + Configuration conf = server.getConfiguration(); + this.server = server; + this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, + HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD); + this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY, + HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK); + } + + public void start() { + this.heapMemChore = new HeapMemoryChore(); + Threads.setDaemonThreadRunning(heapMemChore.getThread()); + } + + public void stop() { + // The thread is Daemon. Just interrupting the ongoing process. + this.heapMemChore.interrupt(); + } + + /** + * @return heap occupancy percentage, 0 <= n <= 1 + */ + public float getHeapOccupancyPercent() { + return this.heapOccupancyPercent; + } + + private class HeapMemoryChore extends Chore { + private boolean alarming = false; + + public HeapMemoryChore() { + super(server.getServerName() + "-HeapMemoryChore", defaultChorePeriod, server); + } + + @Override + protected void sleep() { + if (!alarming) { + super.sleep(); + } else { + // we are in the alarm state, so sleep only for a short fixed period + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Interrupted, propagate + Thread.currentThread().interrupt(); + } + } + } + + @Override + protected void chore() { + // Sample heap occupancy + MemoryUsage memUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + heapOccupancyPercent = (float)memUsage.getUsed() / (float)memUsage.getCommitted(); + // If we are above the heap occupancy alarm low watermark, sound the alarm + if (heapOccupancyPercent >= heapOccupancyLowWatermark) { + if (!alarming) { + LOG.warn("heapOccupancyPercent " + heapOccupancyPercent + + " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")"); + alarming = true; + } + } else { + if (alarming) { + LOG.info("heapOccupancyPercent " + heapOccupancyPercent + + " is now below the heap occupancy alarm watermark (" + + heapOccupancyLowWatermark + ")"); + alarming = false; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index e2ec169..2a00473 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -142,4 +142,9 @@ public interface RegionServerServices * @return {@code true} if the registration was successful, {@code false} */ boolean registerService(Service service); + + /** + * @return heap memory manager instance + */ + HeapMemoryManager getHeapMemoryManager(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 1f87f35..7369a30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -248,4 +249,9 @@ class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return false; } + + @Override + public HeapMemoryManager getHeapMemoryManager() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d78b1b9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 7a14a01..5390d85 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.regionserver.CompactionRequestor; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -588,4 +589,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { // TODO Auto-generated method stub return null; } + + @Override + public HeapMemoryManager getHeapMemoryManager() { + return null; + } }
