HBASE-14822 Renewing leases of scanners doesn't work.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86a417ee Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86a417ee Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86a417ee Branch: refs/heads/hbase-12439 Commit: 86a417eeade1fc1b0a7799a48694e6f1d9b4cb1c Parents: cdca22a Author: Lars Hofhansl <[email protected]> Authored: Sat Dec 19 09:55:03 2015 -0800 Committer: Lars Hofhansl <[email protected]> Committed: Sat Dec 19 09:55:03 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 4 +- .../hadoop/hbase/client/ScannerCallable.java | 12 +- .../client/ScannerCallableWithReplicas.java | 4 + .../hadoop/hbase/protobuf/RequestConverter.java | 4 +- .../hbase/protobuf/generated/ClientProtos.java | 218 +++++++++++++------ hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hbase/regionserver/RSRpcServices.java | 7 + .../hbase/client/TestFromClientSide3.java | 25 --- .../hadoop/hbase/client/TestLeaseRenewal.java | 125 +++++++++++ 9 files changed, 307 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index a4514bf..1658e5b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -776,13 +776,13 @@ public abstract class ClientScanner extends AbstractClientScanner { public boolean renewLease() { if (callable != null) { // do not return any rows, do not advance the scanner - callable.setCaching(0); + callable.setRenew(true); try { this.caller.callWithoutRetries(callable, this.scannerTimeout); } catch (Exception e) { return false; } finally { - callable.setCaching(this.caching); + callable.setRenew(false); } return true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 65f74c8..f6445a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -71,6 +71,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected long scannerId = -1L; protected boolean instantiated = false; protected boolean closed = false; + protected boolean renew = false; private Scan scan; private int caching = 1; protected final ClusterConnection cConnection; @@ -206,7 +207,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null); + this.scanMetrics != null, renew); ScanResponse response = null; controller = controllerFactory.newController(); controller.setPriority(getTableName()); @@ -411,6 +412,15 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } /** + * Indicate whether we make a call only to renew the lease, but without affected the scanner in + * any other way. + * @param val true if only the lease should be renewed + */ + public void setRenew(boolean val) { + this.renew = val; + } + + /** * @return the HRegionInfo for the current region */ @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 97d7d41..a197e90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -98,6 +98,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { currentScannerCallable.setClose(); } + public void setRenew(boolean val) { + currentScannerCallable.setRenew(val); + } + public void setCaching(int caching) { currentScannerCallable.setCaching(caching); } http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index ada510b..bd4c427 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -497,7 +497,8 @@ public final class RequestConverter { * @return a scan request */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) { + final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics, + final boolean renew) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -506,6 +507,7 @@ public final class RequestConverter { builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(trackMetrics); + builder.setRenew(renew); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 5e17ad5..d188531 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -17171,6 +17171,16 @@ public final class ClientProtos { * <code>optional bool track_scan_metrics = 9;</code> */ boolean getTrackScanMetrics(); + + // optional bool renew = 10 [default = false]; + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + boolean hasRenew(); + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + boolean getRenew(); } /** * Protobuf type {@code hbase.pb.ScanRequest} @@ -17297,6 +17307,11 @@ public final class ClientProtos { trackScanMetrics_ = input.readBool(); break; } + case 80: { + bitField0_ |= 0x00000200; + renew_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17493,6 +17508,22 @@ public final class ClientProtos { return trackScanMetrics_; } + // optional bool renew = 10 [default = false]; + public static final int RENEW_FIELD_NUMBER = 10; + private boolean renew_; + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public boolean getRenew() { + return renew_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -17503,6 +17534,7 @@ public final class ClientProtos { clientHandlesPartials_ = false; clientHandlesHeartbeats_ = false; trackScanMetrics_ = false; + renew_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17555,6 +17587,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000100) == 0x00000100)) { output.writeBool(9, trackScanMetrics_); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + output.writeBool(10, renew_); + } getUnknownFields().writeTo(output); } @@ -17600,6 +17635,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(9, trackScanMetrics_); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(10, renew_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -17668,6 +17707,11 @@ public final class ClientProtos { result = result && (getTrackScanMetrics() == other.getTrackScanMetrics()); } + result = result && (hasRenew() == other.hasRenew()); + if (hasRenew()) { + result = result && (getRenew() + == other.getRenew()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -17717,6 +17761,10 @@ public final class ClientProtos { hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getTrackScanMetrics()); } + if (hasRenew()) { + hash = (37 * hash) + RENEW_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getRenew()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17867,6 +17915,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000080); trackScanMetrics_ = false; bitField0_ = (bitField0_ & ~0x00000100); + renew_ = false; + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -17939,6 +17989,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000100; } result.trackScanMetrics_ = trackScanMetrics_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000200; + } + result.renew_ = renew_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17982,6 +18036,9 @@ public final class ClientProtos { if (other.hasTrackScanMetrics()) { setTrackScanMetrics(other.getTrackScanMetrics()); } + if (other.hasRenew()) { + setRenew(other.getRenew()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -18486,6 +18543,39 @@ public final class ClientProtos { return this; } + // optional bool renew = 10 [default = false]; + private boolean renew_ ; + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public boolean getRenew() { + return renew_; + } + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public Builder setRenew(boolean value) { + bitField0_ |= 0x00000200; + renew_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool renew = 10 [default = false];</code> + */ + public Builder clearRenew() { + bitField0_ = (bitField0_ & ~0x00000200); + renew_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest) } @@ -34156,76 +34246,76 @@ public final class ClientProtos { " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\"\220\002\n\013ScanRequest\022)\n\006r" + + "lumnFamilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006r" + "egion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034", "\n\004scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_" + "id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclos" + "e_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037" + "\n\027client_handles_partials\030\007 \001(\010\022!\n\031clien" + "t_handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan" + - "_metrics\030\t \001(\010\"\232\002\n\014ScanResponse\022\030\n\020cells" + - "_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n" + - "\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007resu" + - "lts\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001" + - "(\010\022\037\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026m", - "ore_results_in_region\030\010 \001(\010\022\031\n\021heartbeat" + - "_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.h" + - "base.pb.ScanMetrics\"\305\001\n\024BulkLoadHFileReq" + - "uest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSp" + - "ecifier\022>\n\013family_path\030\002 \003(\0132).hbase.pb." + - "BulkLoadHFileRequest.FamilyPath\022\026\n\016assig" + - "n_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030" + - "\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResp" + - "onse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServi" + - "ceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(", - "\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"" + - "B\n\030CoprocessorServiceResult\022&\n\005value\030\001 \001" + - "(\0132\027.hbase.pb.NameBytesPair\"v\n\031Coprocess" + - "orServiceRequest\022)\n\006region\030\001 \002(\0132\031.hbase" + - ".pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbas" + - "e.pb.CoprocessorServiceCall\"o\n\032Coprocess" + - "orServiceResponse\022)\n\006region\030\001 \002(\0132\031.hbas" + - "e.pb.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hb" + - "ase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index" + - "\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Muta", - "tionProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n" + - "\014service_call\030\004 \001(\0132 .hbase.pb.Coprocess" + - "orServiceCall\"k\n\014RegionAction\022)\n\006region\030" + - "\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atom" + - "ic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Acti" + - "on\"c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 " + - "\001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022comp" + - "actionPressure\030\003 \001(\005:\0010\"\332\001\n\021ResultOrExce" + - "ption\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.h" + - "base.pb.Result\022*\n\texception\030\003 \001(\0132\027.hbas", - "e.pb.NameBytesPair\022:\n\016service_result\030\004 \001" + - "(\0132\".hbase.pb.CoprocessorServiceResult\022," + - "\n\tloadStats\030\005 \001(\0132\031.hbase.pb.RegionLoadS" + - "tats\"x\n\022RegionActionResult\0226\n\021resultOrEx" + - "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" + - "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt" + - "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" + - " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" + - "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" + - "ondition\"\\\n\rMultiResponse\0228\n\022regionActio", - "nResult\030\001 \003(\0132\034.hbase.pb.RegionActionRes" + - "ult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n" + - "\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClientServic" + - "e\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hbase.p" + - "b.GetResponse\022;\n\006Mutate\022\027.hbase.pb.Mutat" + - "eRequest\032\030.hbase.pb.MutateResponse\0225\n\004Sc" + - "an\022\025.hbase.pb.ScanRequest\032\026.hbase.pb.Sca" + - "nResponse\022P\n\rBulkLoadHFile\022\036.hbase.pb.Bu" + - "lkLoadHFileRequest\032\037.hbase.pb.BulkLoadHF" + - "ileResponse\022X\n\013ExecService\022#.hbase.pb.Co", - "processorServiceRequest\032$.hbase.pb.Copro" + - "cessorServiceResponse\022d\n\027ExecRegionServe" + - "rService\022#.hbase.pb.CoprocessorServiceRe" + + "_metrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n" + + "\014ScanResponse\022\030\n\020cells_per_result\030\001 \003(\r\022" + + "\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" + + "\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase." + + "pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag", + "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + + "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + + "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + + "s\"\305\001\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + + "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + + "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + + "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n" + + "\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(" + + "\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002" + + "(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(", + "\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003" + + " \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServ" + + "iceResult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.Name" + + "BytesPair\"v\n\031CoprocessorServiceRequest\022)" + + "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + + "r\022.\n\004call\030\002 \002(\0132 .hbase.pb.CoprocessorSe" + + "rviceCall\"o\n\032CoprocessorServiceResponse\022" + + ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + + "er\022&\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPa" + + "ir\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation", + "\030\002 \001(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003" + + " \001(\0132\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(" + + "\0132 .hbase.pb.CoprocessorServiceCall\"k\n\014R" + + "egionAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" + + "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action" + + "\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017RegionLoadSt" + + "ats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccu" + + "pancy\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001" + + "(\005:\0010\"\332\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + + "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t", + "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + + "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + + "ocessorServiceResult\022,\n\tloadStats\030\005 \001(\0132" + + "\031.hbase.pb.RegionLoadStats\"x\n\022RegionActi" + + "onResult\0226\n\021resultOrException\030\001 \003(\0132\033.hb" + + "ase.pb.ResultOrException\022*\n\texception\030\002 " + + "\001(\0132\027.hbase.pb.NameBytesPair\"x\n\014MultiReq" + + "uest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.pb.Re" + + "gionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondit" + + "ion\030\003 \001(\0132\023.hbase.pb.Condition\"\\\n\rMultiR", + "esponse\0228\n\022regionActionResult\030\001 \003(\0132\034.hb" + + "ase.pb.RegionActionResult\022\021\n\tprocessed\030\002" + + " \001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMEL" + + "INE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024.hbase.p" + + "b.GetRequest\032\025.hbase.pb.GetResponse\022;\n\006M" + + "utate\022\027.hbase.pb.MutateRequest\032\030.hbase.p" + + "b.MutateResponse\0225\n\004Scan\022\025.hbase.pb.Scan" + + "Request\032\026.hbase.pb.ScanResponse\022P\n\rBulkL" + + "oadHFile\022\036.hbase.pb.BulkLoadHFileRequest" + + "\032\037.hbase.pb.BulkLoadHFileResponse\022X\n\013Exe", + "cService\022#.hbase.pb.CoprocessorServiceRe" + "quest\032$.hbase.pb.CoprocessorServiceRespo" + - "nse\0228\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.h" + - "base.pb.MultiResponseBB\n*org.apache.hado" + - "op.hbase.protobuf.generatedB\014ClientProto" + - "sH\001\210\001\001\240\001\001" + "nse\022d\n\027ExecRegionServerService\022#.hbase.p" + + "b.CoprocessorServiceRequest\032$.hbase.pb.C" + + "oprocessorServiceResponse\0228\n\005Multi\022\026.hba" + + "se.pb.MultiRequest\032\027.hbase.pb.MultiRespo" + + "nseBB\n*org.apache.hadoop.hbase.protobuf." + + "generatedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -34321,7 +34411,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index a3a969f..ca9abdc 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -277,6 +277,7 @@ message ScanRequest { optional bool client_handles_partials = 7; optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; + optional bool renew = 10 [default = false]; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3b254c0..75705e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2517,6 +2517,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ttl = this.scannerLeaseTimeoutPeriod; } assert scanner != null; + if (request.hasRenew() && request.getRenew()) { + lease = regionServer.leases.removeLease(scannerName); + if (lease != null && scanners.containsKey(scannerName)) { + regionServer.leases.addLease(lease); + } + return builder.build(); + } RpcCallContext context = RpcServer.getCurrentCall(); Object lastBlock = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 96bc59f..c451cf0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -476,29 +476,4 @@ public class TestFromClientSide3 { assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); table.close(); } - - @Test - public void testLeaseRenewal() throws Exception { - Table table = TEST_UTIL.createTable( - TableName.valueOf("testLeaseRenewal"), FAMILY); - Put p = new Put(ROW_BYTES); - p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - p = new Put(ANOTHERROW); - p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - Scan s = new Scan(); - s.setCaching(1); - ResultScanner rs = table.getScanner(s); - // make sure that calling renewLease does not impact the scan results - assertTrue(rs.renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); - assertTrue(rs.renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); - assertTrue(rs.renewLease()); - assertNull(rs.next()); - assertFalse(rs.renewLease()); - rs.close(); - table.close(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/86a417ee/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java new file mode 100644 index 0000000..c89edf5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -0,0 +1,125 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestLeaseRenewal { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); + private final static byte[] COL_QUAL = Bytes.toBytes("f1"); + private final static byte[] VAL_BYTES = Bytes.toBytes("v1"); + private final static byte[] ROW_BYTES = Bytes.toBytes("r1"); + private final static int leaseTimeout = + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 4; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + leaseTimeout); + TEST_UTIL.startMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + for (HTableDescriptor htd : TEST_UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void testLeaseRenewal() throws Exception { + HTable table = TEST_UTIL.createTable( + TableName.valueOf("testLeaseRenewal"), FAMILY); + Put p = new Put(ROW_BYTES); + p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + p = new Put(ANOTHERROW); + p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + Scan s = new Scan(); + s.setCaching(1); + ResultScanner rs = table.getScanner(s); + // make sure that calling renewLease does not impact the scan results + assertTrue(rs.renewLease()); + assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); + // renew the lease a few times, long enough to be sure + // the lease would have expired otherwise + Thread.sleep(leaseTimeout/2); + assertTrue(rs.renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(rs.renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(rs.renewLease()); + // make sure we haven't advanced the scanner + assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); + assertTrue(rs.renewLease()); + // make sure scanner is exhausted now + assertNull(rs.next()); + // renewLease should return false now + assertFalse(rs.renewLease()); + rs.close(); + table.close(); + } +}
