HBASE-15816 Provide client with ability to set priority on Operations Signed-off-by: Andrew Purtell <apurt...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26247996 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26247996 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26247996 Branch: refs/heads/branch-1 Commit: 26247996d25dad38678fed2e2a1b8f0d383df082 Parents: 6f1cc2c Author: rgidwani <rgidw...@salesforce.com> Authored: Fri Jul 21 12:20:24 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Jul 21 17:12:25 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Action.java | 8 ++++++ .../org/apache/hadoop/hbase/client/Append.java | 6 +++++ .../hadoop/hbase/client/AsyncProcess.java | 18 ++++++++++--- .../org/apache/hadoop/hbase/client/Delete.java | 7 +++++ .../org/apache/hadoop/hbase/client/Get.java | 6 +++++ .../org/apache/hadoop/hbase/client/HTable.java | 27 +++++++++++++------- .../apache/hadoop/hbase/client/Increment.java | 6 +++++ .../apache/hadoop/hbase/client/MultiAction.java | 10 ++++++++ .../hbase/client/MultiServerCallable.java | 5 ++-- .../apache/hadoop/hbase/client/Mutation.java | 5 +++- .../hbase/client/OperationWithAttributes.java | 11 ++++++++ .../client/PayloadCarryingServerCallable.java | 10 ++++++-- .../hbase/client/RegionServerCallable.java | 11 ++++++++ .../hadoop/hbase/client/RowMutations.java | 8 ++++++ .../RpcRetryingCallerWithReadReplicas.java | 3 ++- .../org/apache/hadoop/hbase/client/Scan.java | 7 +++++ .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hadoop/hbase/ipc/HBaseRpcController.java | 2 -- .../hbase/ipc/HBaseRpcControllerImpl.java | 6 ++--- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 3 ++- .../org/apache/hadoop/hbase/HConstants.java | 1 + .../hbase/client/TestRpcControllerFactory.java | 27 +++++++++++++++++--- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++ 24 files changed, 164 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index 2bc5d79..5417b6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -34,11 +34,17 @@ public class Action<R> implements Comparable<R> { private int originalIndex; private long nonce = HConstants.NO_NONCE; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority; public Action(Row action, int originalIndex) { + this(action, originalIndex, HConstants.PRIORITY_UNSET); + } + + public Action(Row action, int originalIndex, int priority) { super(); this.action = action; this.originalIndex = originalIndex; + this.priority = priority; } /** @@ -75,6 +81,8 @@ public class Action<R> implements Comparable<R> { return replicaId; } + public int getPriority() { return priority; } + @SuppressWarnings("rawtypes") @Override public int compareTo(Object o) { http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index f20f727..ec4ea37 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -86,6 +86,7 @@ public class Append extends Mutation { for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + this.setPriority(a.getPriority()); } /** Create a Append operation for the specified row. @@ -184,6 +185,11 @@ public class Append extends Mutation { } @Override + public Append setPriority(int priority) { + return (Append) super.setPriority(priority); + } + + @Override public Append setTTL(long ttl) { return (Append) super.setTTL(ttl); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 73cafc1..10d4f38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -504,7 +504,11 @@ class AsyncProcess { LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. - retainedActions.add(new Action<Row>(r, ++posInList)); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + retainedActions.add(new Action<Row>(r, ++posInList, priority)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); @@ -516,7 +520,11 @@ class AsyncProcess { break; } if (code == ReturnCode.INCLUDE) { - Action<Row> action = new Action<Row>(r, ++posInList); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + Action<Row> action = new Action<Row>(r, ++posInList, priority); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path @@ -619,6 +627,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; NonceGenerator ng = this.connection.getNonceGenerator(); + int highestPriority = HConstants.PRIORITY_UNSET; for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -626,8 +635,9 @@ class AsyncProcess { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } + highestPriority = Math.max(put.getPriority(), highestPriority); } - Action<Row> action = new Action<Row>(r, posInList); + Action<Row> action = new Action<Row>(r, posInList, highestPriority); setNonce(ng, r, action); actions.add(action); } @@ -1782,7 +1792,7 @@ class AsyncProcess { protected MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName, final MultiAction<Row> multi) { return new MultiServerCallable<Row>(connection, tableName, server, - AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker); + AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker, multi.getPriority()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index bdacf93..4e1fe09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> { for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(d.getPriority()); } /** @@ -478,4 +479,10 @@ public class Delete extends Mutation implements Comparable<Row> { public Delete setTTL(long ttl) { throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported"); } + + @Override + public Delete setPriority(int priority) { + return (Delete) super.setPriority(priority); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 88da0b0..2a1e9f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -130,6 +130,7 @@ public class Get extends Query TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + super.setPriority(get.getPriority()); } public boolean isCheckExistenceOnly() { @@ -511,4 +512,9 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + @Override + public Get setPriority(int priority) { + return (Get) super.setPriority(priority); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d4fa2e3..e9531f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -846,13 +846,14 @@ public class HTable implements HTableInterface, RegionLocator { // Good old call. final Get getReq = get; RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), get.getRow()) { + getName(), get.getRow(), get.getPriority()) { @Override public Result call(int callTimeout) throws IOException { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { ClientProtos.GetResponse response = getStub().get(controller, request); @@ -973,11 +974,12 @@ public class HTable implements HTableInterface, RegionLocator { public void delete(final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, - tableName, delete.getRow()) { + tableName, delete.getRow(), delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { @@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface, RegionLocator { public MultiResponse call(int callTimeout) throws IOException { controller.reset(); controller.setPriority(tableName); + controller.setPriority(rm.getMaxPriority()); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); @@ -1103,12 +1106,12 @@ public class HTable implements HTableInterface, RegionLocator { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable<Result> callable = - new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { + new RegionServerCallable<Result>(this.connection, getName(), append.getRow(), append.getPriority()) { @Override public Result call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); + controller.setCallTimeout(getPriority()); try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); @@ -1136,11 +1139,12 @@ public class HTable implements HTableInterface, RegionLocator { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), increment.getRow()) { + getName(), increment.getRow(), increment.getPriority()) { @Override public Result call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(getTableName()); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1236,11 +1240,12 @@ public class HTable implements HTableInterface, RegionLocator { final Put put) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { + new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1266,11 +1271,12 @@ public class HTable implements HTableInterface, RegionLocator { final Put put) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { + new RegionServerCallable<Boolean>(connection, getName(), row, put.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -1297,11 +1303,12 @@ public class HTable implements HTableInterface, RegionLocator { final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { + new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( @@ -1327,11 +1334,12 @@ public class HTable implements HTableInterface, RegionLocator { final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { + new RegionServerCallable<Boolean>(connection, getName(), row, delete.getPriority()) { @Override public Boolean call(int callTimeout) throws IOException { HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -1364,6 +1372,7 @@ public class HTable implements HTableInterface, RegionLocator { public MultiResponse call(int callTimeout) throws IOException { controller.reset(); controller.setPriority(tableName); + controller.setPriority(rm.getMaxPriority()); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index b60cbde..22885d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -86,6 +86,7 @@ public class Increment extends Mutation implements Comparable<Row> { for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(i.getPriority()); } /** @@ -351,4 +352,9 @@ public class Increment extends Mutation implements Comparable<Row> { public Increment setTTL(long ttl) { return (Increment) super.setTTL(ttl); } + + @Override + public Increment setPriority(int priority) { + return (Increment) super.setPriority(priority); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 0a9055e..3ab1dbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -104,4 +104,14 @@ public final class MultiAction<R> { public long getNonceGroup() { return this.nonceGroup; } + + public int getPriority() { + int maxPriority = HConstants.PRIORITY_UNSET; + for (List<Action<R>> actionList : actions.values()) { + for (Action<R> action : actionList) { + maxPriority = Math.max(maxPriority, action.getPriority()); + } + } + return maxPriority; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 738ff6e..42c63eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -57,8 +57,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi, - int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, null, rpcFactory); + int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, null, rpcFactory, priority); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so @@ -130,6 +130,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse controller.reset(); if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); controller.setPriority(getTableName()); + controller.setPriority(getPriority()); controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index d11c459..cc46137 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C // familyMap ClassSize.REFERENCE + // familyMap - ClassSize.TREEMAP); + ClassSize.TREEMAP + + // priority + ClassSize.INTEGER + ); /** * The attribute for storing the list of clusters that have consumed the change. http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java index d9d54ea..1619f6d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; @@ -36,6 +37,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri // used for uniquely identifying an operation public static final String ID_ATRIBUTE = "_operation.attributes.id"; + private int priority = HConstants.PRIORITY_UNSET; @Override public OperationWithAttributes setAttribute(String name, byte[] value) { @@ -110,4 +112,13 @@ public abstract class OperationWithAttributes extends Operation implements Attri byte[] attr = getAttribute(ID_ATRIBUTE); return attr == null? null: Bytes.toString(attr); } + + public OperationWithAttributes setPriority(int priority) { + this.priority = priority; + return this; + } + + public int getPriority() { + return priority; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java index aa3d5c0..7532057 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -31,8 +32,13 @@ public abstract class PayloadCarryingServerCallable<T> protected HBaseRpcController controller; public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { - super(connection, tableName, row); + RpcControllerFactory rpcControllerFactory) { + this(connection, tableName, row, rpcControllerFactory, HConstants.NORMAL_QOS); + } + + public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory, int priority) { + super(connection, tableName, row, priority); this.controller = rpcControllerFactory.newController(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index e0b09f3..7eb0932 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -50,6 +51,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { protected final byte[] row; protected HRegionLocation location; private ClientService.BlockingInterface stub; + protected int priority; /** * @param connection Connection to use. @@ -57,9 +59,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @param row The row we want in <code>tableName</code>. */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { + this(connection, tableName, row, HConstants.NORMAL_QOS); + } + + public RegionServerCallable(Connection connection, TableName tableName, byte [] row, int priority) { this.connection = connection; this.tableName = tableName; this.row = row; + this.priority = priority; } /** @@ -117,6 +124,10 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { return this.row; } + public int getPriority() { + return priority; + } + @Override public void throwable(Throwable t, boolean retrying) { if (location != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index 888306d..c5ce4ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -114,4 +114,12 @@ public class RowMutations implements Row { public List<Mutation> getMutations() { return Collections.unmodifiableList(mutations); } + + public int getMaxPriority() { + int maxPriority = Integer.MIN_VALUE; + for (Mutation mutation : mutations) { + maxPriority = Math.max(maxPriority, mutation.getPriority()); + } + return maxPriority; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index e6954cc..bfae3d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -100,7 +101,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, - RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); + RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), HConstants.PRIORITY_UNSET); this.id = id; this.location = location; this.controller = rpcControllerFactory.newController(); http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 9b8724c..4efd405 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -278,6 +278,7 @@ public class Scan extends Query { this.mvccReadPoint = scan.getMvccReadPoint(); this.limit = scan.getLimit(); this.needCursorResult = scan.isNeedCursorResult(); + setPriority(scan.getPriority()); } /** @@ -307,6 +308,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = -1L; + setPriority(get.getPriority()); } public boolean isGetScan() { @@ -1088,6 +1090,11 @@ public class Scan extends Query { return (Scan) super.setIsolationLevel(level); } + @Override + public Scan setPriority(int priority) { + return (Scan) super.setPriority(priority); + } + /** * Utility that creates a Scan that will do a small scan in reverse from passed row * looking for next closest row. http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index caa9dec..d8d6e7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -131,7 +131,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow()); + super(connection, tableName, scan.getStartRow(), scan.getPriority()); this.id = id; this.cConnection = connection; this.scan = scan; http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 2c4b335..e7da60b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface HBaseRpcController extends RpcController, CellScannable { - static final int PRIORITY_UNSET = -1; - /** * Only used to send cells to rpc server, the returned cells should be set by * {@link #setDone(CellScanner)}. http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index a976473..0f20c00 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { * This is the ordained way of setting priorities going forward. We will be undoing the old * annotation-based mechanism. */ - private int priority = PRIORITY_UNSET; + private int priority = HConstants.PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -95,7 +95,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public void setPriority(int priority) { - this.priority = priority; + this.priority = Math.max(this.priority, priority); } @Override @@ -106,7 +106,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public int getPriority() { - return priority; + return priority < 0 ? HConstants.NORMAL_QOS : priority; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 4fa58ad..9a4a5c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -111,7 +112,7 @@ class IPCUtil { builder.setCellBlockMeta(cellBlockMeta); } // Only pass priority if there is one set. - if (call.priority != HBaseRpcController.PRIORITY_UNSET) { + if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } builder.setTimeout(call.timeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 0052423..f942aed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -26,6 +26,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -84,7 +85,7 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ final ClientProtos.CoprocessorServiceCall call = CoprocessorRpcUtils.buildServiceCall(row, method, request); RegionServerCallable<CoprocessorServiceResponse> callable = - new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { + new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row, HConstants.PRIORITY_UNSET) { @Override public CoprocessorServiceResponse call(int callTimeout) throws Exception { if (rpcController instanceof HBaseRpcController) { http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 93461f9..8df7bd8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1114,6 +1114,7 @@ public final class HConstants { * handled by high priority handlers. */ // normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS + public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int QOS_THRESHOLD = 10; public static final int HIGH_QOS = 200; http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 1d49460..f5cfa2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -20,13 +20,16 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Multiset; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -72,6 +75,7 @@ public class TestRpcControllerFactory { public static class CountingRpcController extends DelegatingHBaseRpcController { + private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create(); private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); @@ -81,8 +85,13 @@ public class TestRpcControllerFactory { @Override public void setPriority(int priority) { + int oldPriority = getPriority(); super.setPriority(priority); - INT_PRIORITY.incrementAndGet(); + int newPriority = getPriority(); + if (newPriority != oldPriority) { + INT_PRIORITY.incrementAndGet(); + GROUPED_PRIORITY.add(priority); + } } @Override @@ -189,6 +198,14 @@ public class TestRpcControllerFactory { scanInfo.setSmall(false); counter = doScan(table, scanInfo, counter); + // make sure we have no priority count + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0); + // lets set a custom priority on a get + Get get = new Get(row); + get.setPriority(HConstants.ADMIN_QOS); + table.get(get); + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1); + table.close(); } @@ -200,9 +217,13 @@ public class TestRpcControllerFactory { } int verifyCount(Integer counter) { - assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get()); + assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter); assertEquals(0, CountingRpcController.INT_PRIORITY.get()); - return counter + 1; + return CountingRpcController.TABLE_PRIORITY.get() + 1; + } + + void verifyPriorityGroupCount(int priorityLevel, int count) { + assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel)); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/26247996/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 1ea65fa..12559e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -377,6 +377,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -387,6 +388,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual);