This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new a2f58071ae1 HBASE-29090: Add server-side load metrics to client
results (#6623)
a2f58071ae1 is described below
commit a2f58071ae1ff3dfef00aede8d888ac6aabd7149
Author: Hernan Romer <[email protected]>
AuthorDate: Wed Apr 23 11:17:34 2025 -0400
HBASE-29090: Add server-side load metrics to client results (#6623)
Co-authored-by: Hernan Gelaf-Romer <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Ray Mattingly <[email protected]>
---
.../apache/hadoop/hbase/client/CheckAndMutate.java | 54 +++--
.../hadoop/hbase/client/CheckAndMutateResult.java | 11 +
.../java/org/apache/hadoop/hbase/client/Get.java | 2 +
.../java/org/apache/hadoop/hbase/client/Query.java | 24 +++
...CheckAndMutateResult.java => QueryMetrics.java} | 26 +--
.../hadoop/hbase/client/RawAsyncTableImpl.java | 18 +-
.../org/apache/hadoop/hbase/client/Result.java | 11 +
.../java/org/apache/hadoop/hbase/client/Scan.java | 3 +
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 44 +++-
.../hbase/shaded/protobuf/RequestConverter.java | 19 +-
.../hbase/shaded/protobuf/ResponseConverter.java | 23 +-
.../org/apache/hadoop/hbase/client/TestGet.java | 2 +
.../hadoop/hbase/client/TestOnlineLogRecord.java | 30 +--
.../org/apache/hadoop/hbase/client/TestScan.java | 7 +-
.../hbase/shaded/protobuf/TestProtobufUtil.java | 2 +
.../src/main/protobuf/client/Client.proto | 18 ++
.../apache/hadoop/hbase/regionserver/HRegion.java | 11 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 30 ++-
.../hbase/regionserver/RegionScannerImpl.java | 4 +
.../hbase/client/TestAsyncTableQueryMetrics.java | 239 +++++++++++++++++++++
.../hbase/client/TestMalformedCellFromClient.java | 2 +-
21 files changed, 505 insertions(+), 75 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index b31a0b27242..e9a46291ef9 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -70,6 +70,7 @@ public final class CheckAndMutate implements Row {
private byte[] value;
private Filter filter;
private TimeRange timeRange;
+ private boolean queryMetricsEnabled = false;
private Builder(byte[] row) {
this.row = Preconditions.checkNotNull(row, "row is null");
@@ -133,6 +134,21 @@ public final class CheckAndMutate implements Row {
return this;
}
+ /**
+ * Enables the return of {@link QueryMetrics} alongside the corresponding
result for this query
+ * <p>
+ * This is intended for advanced users who need result-granular,
server-side metrics
+ * <p>
+ * Does not work
+ * @param queryMetricsEnabled {@code true} to enable collection of
per-result query metrics
+ * {@code false} to disable metrics collection
(resulting in
+ * {@code null} metrics)
+ */
+ public Builder queryMetricsEnabled(boolean queryMetricsEnabled) {
+ this.queryMetricsEnabled = queryMetricsEnabled;
+ return this;
+ }
+
private void preCheck(Row action) {
Preconditions.checkNotNull(action, "action is null");
if (!Bytes.equals(row, action.getRow())) {
@@ -154,9 +170,10 @@ public final class CheckAndMutate implements Row {
public CheckAndMutate build(Put put) {
preCheck(put);
if (filter != null) {
- return new CheckAndMutate(row, filter, timeRange, put);
+ return new CheckAndMutate(row, filter, timeRange, put,
queryMetricsEnabled);
} else {
- return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, put);
+ return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, put,
+ queryMetricsEnabled);
}
}
@@ -168,9 +185,10 @@ public final class CheckAndMutate implements Row {
public CheckAndMutate build(Delete delete) {
preCheck(delete);
if (filter != null) {
- return new CheckAndMutate(row, filter, timeRange, delete);
+ return new CheckAndMutate(row, filter, timeRange, delete,
queryMetricsEnabled);
} else {
- return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, delete);
+ return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, delete,
+ queryMetricsEnabled);
}
}
@@ -182,9 +200,10 @@ public final class CheckAndMutate implements Row {
public CheckAndMutate build(Increment increment) {
preCheck(increment);
if (filter != null) {
- return new CheckAndMutate(row, filter, timeRange, increment);
+ return new CheckAndMutate(row, filter, timeRange, increment,
queryMetricsEnabled);
} else {
- return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, increment);
+ return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, increment,
+ queryMetricsEnabled);
}
}
@@ -196,9 +215,10 @@ public final class CheckAndMutate implements Row {
public CheckAndMutate build(Append append) {
preCheck(append);
if (filter != null) {
- return new CheckAndMutate(row, filter, timeRange, append);
+ return new CheckAndMutate(row, filter, timeRange, append,
queryMetricsEnabled);
} else {
- return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, append);
+ return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, append,
+ queryMetricsEnabled);
}
}
@@ -210,9 +230,10 @@ public final class CheckAndMutate implements Row {
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);
if (filter != null) {
- return new CheckAndMutate(row, filter, timeRange, mutations);
+ return new CheckAndMutate(row, filter, timeRange, mutations,
queryMetricsEnabled);
} else {
- return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, mutations);
+ return new CheckAndMutate(row, family, qualifier, op, value,
timeRange, mutations,
+ queryMetricsEnabled);
}
}
}
@@ -234,9 +255,10 @@ public final class CheckAndMutate implements Row {
private final Filter filter;
private final TimeRange timeRange;
private final Row action;
+ private final boolean queryMetricsEnabled;
private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier, final
CompareOperator op,
- byte[] value, TimeRange timeRange, Row action) {
+ byte[] value, TimeRange timeRange, Row action, boolean
queryMetricsEnabled) {
this.row = row;
this.family = family;
this.qualifier = qualifier;
@@ -245,9 +267,11 @@ public final class CheckAndMutate implements Row {
this.filter = null;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
+ this.queryMetricsEnabled = queryMetricsEnabled;
}
- private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row
action) {
+ private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row
action,
+ boolean queryMetricsEnabled) {
this.row = row;
this.family = null;
this.qualifier = null;
@@ -256,6 +280,7 @@ public final class CheckAndMutate implements Row {
this.filter = filter;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
+ this.queryMetricsEnabled = queryMetricsEnabled;
}
/** Returns the row */
@@ -303,4 +328,9 @@ public final class CheckAndMutate implements Row {
public Row getAction() {
return action;
}
+
+ /** Returns whether query metrics are enabled */
+ public boolean isQueryMetricsEnabled() {
+ return queryMetricsEnabled;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
index 8ecb49e3d5f..6ed6f7e26a9 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
@@ -27,6 +27,8 @@ public class CheckAndMutateResult {
private final boolean success;
private final Result result;
+ private QueryMetrics metrics = null;
+
public CheckAndMutateResult(boolean success, Result result) {
this.success = success;
this.result = result;
@@ -41,4 +43,13 @@ public class CheckAndMutateResult {
public Result getResult() {
return result;
}
+
+ public CheckAndMutateResult setMetrics(QueryMetrics metrics) {
+ this.metrics = metrics;
+ return this;
+ }
+
+ public QueryMetrics getMetrics() {
+ return metrics;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index e95dfd46606..5415da4cf91 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -95,6 +95,7 @@ public class Get extends Query implements Row {
this.setFilter(get.getFilter());
this.setReplicaId(get.getReplicaId());
this.setConsistency(get.getConsistency());
+ this.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
// from Get
this.cacheBlocks = get.getCacheBlocks();
this.maxVersions = get.getMaxVersions();
@@ -453,6 +454,7 @@ public class Get extends Query implements Row {
map.put("colFamTimeRangeMap", colFamTimeRangeMapStr);
}
map.put("priority", getPriority());
+ map.put("queryMetricsEnabled", queryMetricsEnabled);
return map;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
index 944a7037682..243a6dac426 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
@@ -46,6 +47,7 @@ public abstract class Query extends OperationWithAttributes {
protected Consistency consistency = Consistency.STRONG;
protected Map<byte[], TimeRange> colFamTimeRangeMap =
Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected Boolean loadColumnFamiliesOnDemand = null;
+ protected boolean queryMetricsEnabled = false;
public Filter getFilter() {
return filter;
@@ -157,6 +159,28 @@ public abstract class Query extends
OperationWithAttributes {
return this;
}
+ /**
+ * Enables the return of {@link QueryMetrics} alongside the corresponding
result(s) for this query
+ * <p>
+ * This is intended for advanced users who need result-granular, server-side
metrics
+ * <p>
+ * Does not work with calls to {@link Table#exists(Get)} and {@link
Table#exists(List)}
+ * @param enabled {@code true} to enable collection of per-result query
metrics {@code false} to
+ * disable metrics collection (resulting in {@code null}
metrics)
+ */
+ public Query setQueryMetricsEnabled(boolean enabled) {
+ this.queryMetricsEnabled = enabled;
+ return this;
+ }
+
+ /**
+ * Returns whether query metrics are enabled
+ * @return {@code true} if query metrics are enabled, {@code false} otherwise
+ */
+ public boolean isQueryMetricsEnabled() {
+ return queryMetricsEnabled;
+ }
+
/**
* Returns The isolation level of this query. If no isolation level was set
for this query object,
* then it returns READ_COMMITTED.
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java
similarity index 62%
copy from
hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
copy to
hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java
index 8ecb49e3d5f..9243b43bb7e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutateResult.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java
@@ -18,27 +18,19 @@
package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
-/**
- * Represents a result of a CheckAndMutate operation
- */
@InterfaceAudience.Public
-public class CheckAndMutateResult {
- private final boolean success;
- private final Result result;
-
- public CheckAndMutateResult(boolean success, Result result) {
- this.success = success;
- this.result = result;
- }
[email protected]
+public class QueryMetrics {
+ private final long blockBytesScanned;
- /** Returns Whether the CheckAndMutate operation is successful or not */
- public boolean isSuccess() {
- return success;
+ public QueryMetrics(long blockBytesScanned) {
+ this.blockBytesScanned = blockBytesScanned;
}
- /** Returns It is used only for CheckAndMutate operations with
Increment/Append. Otherwise null */
- public Result getResult() {
- return result;
+ @InterfaceStability.Evolving
+ public long getBlockBytesScanned() {
+ return blockBytesScanned;
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index f2f00ed727d..2f8b0dff162 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -358,7 +358,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row,
put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family,
qualifier, op, value,
- null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE,
false),
(c, r) -> r.getProcessed()))
.call(),
supplier);
@@ -374,7 +374,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row,
delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family,
qualifier, op, value,
- null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE,
false),
(c, r) -> r.getProcessed()))
.call(),
supplier);
@@ -392,7 +392,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family,
qualifier, op, value,
- null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE,
false),
CheckAndMutateResult::isSuccess))
.call(), supplier);
}
@@ -433,7 +433,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row,
put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null,
null, null, null, filter,
- timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
(c, r) -> r.getProcessed()))
.call(),
supplier);
@@ -448,7 +448,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row,
delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null,
null, null, null, filter,
- timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
(c, r) -> r.getProcessed()))
.call(),
supplier);
@@ -465,7 +465,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null,
null, null, filter,
- timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE, false),
CheckAndMutateResult::isSuccess))
.call(), supplier);
}
@@ -500,7 +500,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
(rn, m) -> RequestConverter.buildMutateRequest(rn,
checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
- checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m,
nonceGroup, nonce),
+ checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m,
nonceGroup, nonce,
+ checkAndMutate.isQueryMetricsEnabled()),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r,
c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
@@ -516,7 +517,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
(rn, rm) -> RequestConverter.buildMultiRequest(rn,
checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
- checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm,
nonceGroup, nonce),
+ checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm,
nonceGroup, nonce,
+ checkAndMutate.isQueryMetricsEnabled()),
resp -> resp))
.call();
} else {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index f4ac525e5b9..22101845ea4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -99,6 +99,7 @@ public class Result implements ExtendedCellScannable,
ExtendedCellScanner {
*/
private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX;
private RegionLoadStats stats;
+ private QueryMetrics metrics = null;
private final boolean readonly;
@@ -931,6 +932,11 @@ public class Result implements ExtendedCellScannable,
ExtendedCellScanner {
this.stats = loadStats;
}
+ @InterfaceAudience.Private
+ public void setMetrics(QueryMetrics metrics) {
+ this.metrics = metrics;
+ }
+
/**
* Returns the associated statistics about the region from which this was
returned. Can be
* <tt>null</tt> if stats are disabled.
@@ -939,6 +945,11 @@ public class Result implements ExtendedCellScannable,
ExtendedCellScanner {
return stats;
}
+ /** Returns the query metrics, or {@code null} if we do not enable metrics.
*/
+ public QueryMetrics getMetrics() {
+ return metrics;
+ }
+
/**
* All methods modifying state of Result object must call this method to
ensure that special
* purpose immutable Results can't be accidentally modified.
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index c2caac844a8..62a65e4e6e1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -217,6 +217,7 @@ public class Scan extends Query {
setPriority(scan.getPriority());
readType = scan.getReadType();
super.setReplicaId(scan.getReplicaId());
+ super.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
}
/**
@@ -249,6 +250,7 @@ public class Scan extends Query {
this.mvccReadPoint = -1L;
setPriority(get.getPriority());
super.setReplicaId(get.getReplicaId());
+ super.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
}
public boolean isGetScan() {
@@ -826,6 +828,7 @@ public class Scan extends Query {
map.put("colFamTimeRangeMap", colFamTimeRangeMapStr);
}
map.put("priority", getPriority());
+ map.put("queryMetricsEnabled", queryMetricsEnabled);
return map;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 465d9e95ae8..616e0e37457 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -659,6 +660,7 @@ public final class ProtobufUtil {
if (proto.hasLoadColumnFamiliesOnDemand()) {
get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
}
+ get.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
return get;
}
@@ -1096,6 +1098,7 @@ public final class ProtobufUtil {
if (scan.isNeedCursorResult()) {
scanBuilder.setNeedCursorResult(true);
}
+ scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled());
return scanBuilder.build();
}
@@ -1200,6 +1203,7 @@ public final class ProtobufUtil {
if (proto.getNeedCursorResult()) {
scan.setNeedCursorResult(true);
}
+ scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled());
return scan;
}
@@ -1279,6 +1283,7 @@ public final class ProtobufUtil {
if (loadColumnFamiliesOnDemand != null) {
builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
+ builder.setQueryMetricsEnabled(get.isQueryMetricsEnabled());
return builder.build();
}
@@ -1434,6 +1439,10 @@ public final class ProtobufUtil {
builder.setStale(result.isStale());
builder.setPartial(result.mayHaveMoreCellsInRow());
+ if (result.getMetrics() != null) {
+ builder.setMetrics(toQueryMetrics(result.getMetrics()));
+ }
+
return builder.build();
}
@@ -1463,6 +1472,9 @@ public final class ProtobufUtil {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
builder.setStale(result.isStale());
+ if (result.getMetrics() != null) {
+ builder.setMetrics(toQueryMetrics(result.getMetrics()));
+ }
return builder.build();
}
@@ -1503,7 +1515,11 @@ public final class ProtobufUtil {
for (CellProtos.Cell c : values) {
cells.add(toCell(builder, c, decodeTags));
}
- return Result.create(cells, null, proto.getStale(), proto.getPartial());
+ Result r = Result.create(cells, null, proto.getStale(),
proto.getPartial());
+ if (proto.hasMetrics()) {
+ r.setMetrics(toQueryMetrics(proto.getMetrics()));
+ }
+ return r;
}
/**
@@ -1548,9 +1564,15 @@ public final class ProtobufUtil {
}
}
- return (cells == null || cells.isEmpty())
+ Result r = (cells == null || cells.isEmpty())
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
: Result.create(cells, null, proto.getStale());
+
+ if (proto.hasMetrics()) {
+ r.setMetrics(toQueryMetrics(proto.getMetrics()));
+ }
+
+ return r;
}
/**
@@ -3615,6 +3637,7 @@ public final class ProtobufUtil {
? ProtobufUtil.toTimeRange(condition.getTimeRange())
: TimeRange.allTime();
builder.timeRange(timeRange);
+ builder.queryMetricsEnabled(condition.getQueryMetricsEnabled());
try {
MutationType type = mutation.getMutateType();
@@ -3652,6 +3675,7 @@ public final class ProtobufUtil {
? ProtobufUtil.toTimeRange(condition.getTimeRange())
: TimeRange.allTime();
builder.timeRange(timeRange);
+ builder.queryMetricsEnabled(condition.getQueryMetricsEnabled());
try {
if (mutations.size() == 1) {
@@ -3678,7 +3702,7 @@ public final class ProtobufUtil {
public static ClientProtos.Condition toCondition(final byte[] row, final
byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter,
- final TimeRange timeRange) throws IOException {
+ final TimeRange timeRange, boolean queryMetricsEnabled) throws IOException
{
ClientProtos.Condition.Builder builder =
ClientProtos.Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
@@ -3693,18 +3717,19 @@ public final class ProtobufUtil {
.setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
}
+ builder.setQueryMetricsEnabled(queryMetricsEnabled);
return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
}
public static ClientProtos.Condition toCondition(final byte[] row, final
Filter filter,
final TimeRange timeRange) throws IOException {
- return toCondition(row, null, null, null, null, filter, timeRange);
+ return toCondition(row, null, null, null, null, filter, timeRange, false);
}
public static ClientProtos.Condition toCondition(final byte[] row, final
byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value,
final TimeRange timeRange)
throws IOException {
- return toCondition(row, family, qualifier, op, value, null, timeRange);
+ return toCondition(row, family, qualifier, op, value, null, timeRange,
false);
}
public static List<LogEntry> toBalancerDecisionResponse(HBaseProtos.LogEntry
logEntry) {
@@ -3823,6 +3848,15 @@ public final class ProtobufUtil {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}
+ public static ClientProtos.QueryMetrics toQueryMetrics(QueryMetrics metrics)
{
+ return ClientProtos.QueryMetrics.newBuilder()
+ .setBlockBytesScanned(metrics.getBlockBytesScanned()).build();
+ }
+
+ public static QueryMetrics toQueryMetrics(ClientProtos.QueryMetrics metrics)
{
+ return new QueryMetrics(metrics.getBlockBytesScanned());
+ }
+
/**
* Check whether this IPBE indicates EOF or not.
* <p/>
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index b98094ad92a..3bbfac500ce 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -201,7 +201,7 @@ public final class RequestConverter {
public static MutateRequest buildMutateRequest(final byte[] regionName,
final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op,
final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation,
long nonceGroup,
- long nonce) throws IOException {
+ long nonce, boolean queryMetricsEnabled) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
if (mutation instanceof Increment || mutation instanceof Append) {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation),
mutation, nonce))
@@ -210,7 +210,8 @@ public final class RequestConverter {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation),
mutation));
}
return
builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
regionName))
- .setCondition(ProtobufUtil.toCondition(row, family, qualifier, op,
value, filter, timeRange))
+ .setCondition(ProtobufUtil.toCondition(row, family, qualifier, op,
value, filter, timeRange,
+ queryMetricsEnabled))
.build();
}
@@ -221,10 +222,10 @@ public final class RequestConverter {
public static ClientProtos.MultiRequest buildMultiRequest(final byte[]
regionName,
final byte[] row, final byte[] family, final byte[] qualifier, final
CompareOperator op,
final byte[] value, final Filter filter, final TimeRange timeRange,
- final RowMutations rowMutations, long nonceGroup, long nonce) throws
IOException {
- return buildMultiRequest(regionName, rowMutations,
- ProtobufUtil.toCondition(row, family, qualifier, op, value, filter,
timeRange), nonceGroup,
- nonce);
+ final RowMutations rowMutations, long nonceGroup, long nonce, boolean
queryMetricsEnabled)
+ throws IOException {
+ return buildMultiRequest(regionName, rowMutations,
ProtobufUtil.toCondition(row, family,
+ qualifier, op, value, filter, timeRange, queryMetricsEnabled),
nonceGroup, nonce);
}
/**
@@ -559,9 +560,9 @@ public final class RequestConverter {
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
- builder
- .setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
cam.getQualifier(),
- cam.getCompareOp(), cam.getValue(), cam.getFilter(),
cam.getTimeRange()));
+ builder.setCondition(ProtobufUtil.toCondition(cam.getRow(),
cam.getFamily(),
+ cam.getQualifier(), cam.getCompareOp(), cam.getValue(),
cam.getFilter(), cam.getTimeRange(),
+ cam.isQueryMetricsEnabled()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index 09cbc460f22..467560c44f7 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.SingleResponse;
@@ -194,6 +195,7 @@ public final class ResponseConverter {
private static CheckAndMutateResult
getCheckAndMutateResult(RegionActionResult actionResult,
CellScanner cells) throws IOException {
Result result = null;
+ QueryMetrics metrics = null;
if (actionResult.getResultOrExceptionCount() > 0) {
// Get the result of the Increment/Append operations from the first
element of the
// ResultOrException list
@@ -204,8 +206,12 @@ public final class ResponseConverter {
result = r;
}
}
+
+ if (roe.hasMetrics()) {
+ metrics = ProtobufUtil.toQueryMetrics(roe.getMetrics());
+ }
}
- return new CheckAndMutateResult(actionResult.getProcessed(), result);
+ return new CheckAndMutateResult(actionResult.getProcessed(),
result).setMetrics(metrics);
}
private static Result getMutateRowResult(RegionActionResult actionResult,
CellScanner cells)
@@ -240,10 +246,16 @@ public final class ResponseConverter {
ClientProtos.MutateResponse mutateResponse, CellScanner cells) throws
IOException {
boolean success = mutateResponse.getProcessed();
Result result = null;
+ QueryMetrics metrics = null;
if (mutateResponse.hasResult()) {
result = ProtobufUtil.toResult(mutateResponse.getResult(), cells);
}
- return new CheckAndMutateResult(success, result);
+
+ if (mutateResponse.hasMetrics()) {
+ metrics = ProtobufUtil.toQueryMetrics(mutateResponse.getMetrics());
+ }
+
+ return new CheckAndMutateResult(success, result).setMetrics(metrics);
}
/**
@@ -417,6 +429,7 @@ public final class ResponseConverter {
int noOfResults =
cellScanner != null ? response.getCellsPerResultCount() :
response.getResultsCount();
Result[] results = new Result[noOfResults];
+ List<ClientProtos.QueryMetrics> queryMetrics =
response.getQueryMetricsList();
for (int i = 0; i < noOfResults; i++) {
if (cellScanner != null) {
// Cells are out in cellblocks. Group them up again as Results. How
many to read at a
@@ -453,6 +466,12 @@ public final class ResponseConverter {
// Result is pure pb.
results[i] = ProtobufUtil.toResult(response.getResults(i));
}
+
+ // Populate result metrics if they exist
+ if (queryMetrics.size() > i) {
+ QueryMetrics metrics =
ProtobufUtil.toQueryMetrics(queryMetrics.get(i));
+ results[i].setMetrics(metrics);
+ }
}
return results;
}
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java
index ca1a708e64f..e6eed365cd0 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java
@@ -182,6 +182,7 @@ public class TestGet {
get.setMaxResultsPerColumnFamily(10);
get.setRowOffsetPerColumnFamily(11);
get.setCacheBlocks(true);
+ get.setQueryMetricsEnabled(true);
Get copyGet = new Get(get);
assertEquals(0, Bytes.compareTo(get.getRow(), copyGet.getRow()));
@@ -196,6 +197,7 @@ public class TestGet {
assertEquals(get.getConsistency(), copyGet.getConsistency());
assertEquals(get.getReplicaId(), copyGet.getReplicaId());
assertEquals(get.getIsolationLevel(), copyGet.getIsolationLevel());
+ assertTrue(get.isQueryMetricsEnabled());
// from Get class
assertEquals(get.isCheckExistenceOnly(), copyGet.isCheckExistenceOnly());
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java
index 72013b6f294..4446ab0f726 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java
@@ -44,21 +44,21 @@ public class TestOnlineLogRecord {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(123));
scan.withStopRow(Bytes.toBytes(456));
- String expectedOutput =
- "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n" + "
\"queueTime\": 3,\n"
- + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n" + "
\"fsReadTime\": 6,\n"
- + " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + "
\"scan\": {\n"
- + " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n" + "
\"targetReplicaId\": -1,\n"
- + " \"batch\": -1,\n" + " \"totalColumns\": 0,\n" + "
\"maxResultSize\": -1,\n"
- + " \"families\": {},\n" + " \"priority\": -1,\n" + "
\"caching\": -1,\n"
- + " \"includeStopRow\": false,\n" + " \"consistency\":
\"STRONG\",\n"
- + " \"maxVersions\": 1,\n" + " \"storeOffset\": 0,\n" + "
\"mvccReadPoint\": -1,\n"
- + " \"includeStartRow\": true,\n" + " \"needCursorResult\":
false,\n"
- + " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + "
\"storeLimit\": -1,\n"
- + " \"limit\": -1,\n" + " \"cacheBlocks\": true,\n"
- + " \"readType\": \"DEFAULT\",\n" + " \"allowPartialResults\":
false,\n"
- + " \"reversed\": false,\n" + " \"timeRange\": [\n" + "
0,\n"
- + " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
+ String expectedOutput = "{\n" + " \"startTime\": 1,\n" + "
\"processingTime\": 2,\n"
+ + " \"queueTime\": 3,\n" + " \"responseSize\": 4,\n" + "
\"blockBytesScanned\": 5,\n"
+ + " \"fsReadTime\": 6,\n" + " \"multiGetsCount\": 6,\n" + "
\"multiMutationsCount\": 7,\n"
+ + " \"scan\": {\n" + " \"totalColumns\": 0,\n" + "
\"maxResultSize\": -1,\n"
+ + " \"caching\": -1,\n" + " \"includeStopRow\": false,\n"
+ + " \"consistency\": \"STRONG\",\n" + " \"maxVersions\": 1,\n"
+ + " \"mvccReadPoint\": -1,\n" + " \"includeStartRow\": true,\n"
+ + " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + "
\"limit\": -1,\n"
+ + " \"timeRange\": [\n" + " 0,\n" + "
9223372036854775807\n" + " ],\n"
+ + " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n" + "
\"targetReplicaId\": -1,\n"
+ + " \"batch\": -1,\n" + " \"families\": {},\n" + "
\"priority\": -1,\n"
+ + " \"storeOffset\": 0,\n" + " \"queryMetricsEnabled\": false,\n"
+ + " \"needCursorResult\": false,\n" + " \"storeLimit\": -1,\n"
+ + " \"cacheBlocks\": true,\n" + " \"readType\": \"DEFAULT\",\n"
+ + " \"allowPartialResults\": false,\n" + " \"reversed\": false\n"
+ " }\n" + "}";
OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, 6, null, null,
null, null, null, null,
null, 6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap());
String actualOutput = o.toJsonPrettyPrint();
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
index 4b124c68f86..40b2263afb4 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java
@@ -76,7 +76,8 @@ public class TestScan {
.setAttribute("att_v0", Bytes.toBytes("att_v0"))
.setColumnFamilyTimeRange(Bytes.toBytes("cf"), 0, 123).setReplicaId(3)
.setACL("test_user", new Permission(Permission.Action.READ))
- .setAuthorizations(new Authorizations("test_label")).setPriority(3);
+ .setAuthorizations(new
Authorizations("test_label")).setQueryMetricsEnabled(true)
+ .setPriority(3);
Scan scan = new Scan(get);
assertEquals(get.getCacheBlocks(), scan.getCacheBlocks());
@@ -100,6 +101,7 @@ public class TestScan {
assertEquals(get.getACL(), scan.getACL());
assertEquals(get.getAuthorizations().getLabels(),
scan.getAuthorizations().getLabels());
assertEquals(get.getPriority(), scan.getPriority());
+ assertEquals(get.isQueryMetricsEnabled(), scan.isQueryMetricsEnabled());
}
@Test
@@ -216,7 +218,7 @@ public class TestScan {
.setReplicaId(3).setReversed(true).setRowOffsetPerColumnFamily(5)
.setStartStopRowForPrefixScan(Bytes.toBytes("row_")).setScanMetricsEnabled(true)
.setReadType(ReadType.STREAM).withStartRow(Bytes.toBytes("row_1"))
- .withStopRow(Bytes.toBytes("row_2")).setTimeRange(0, 13);
+ .withStopRow(Bytes.toBytes("row_2")).setTimeRange(0,
13).setQueryMetricsEnabled(true);
// create a copy of existing scan object
Scan scanCopy = new Scan(scan);
@@ -252,6 +254,7 @@ public class TestScan {
assertEquals(scan.getStartRow(), scanCopy.getStartRow());
assertEquals(scan.getStopRow(), scanCopy.getStopRow());
assertEquals(scan.getTimeRange(), scanCopy.getTimeRange());
+ assertEquals(scan.isQueryMetricsEnabled(),
scanCopy.isQueryMetricsEnabled());
assertTrue("Make sure copy constructor adds all the fields in the copied
object",
EqualsBuilder.reflectionEquals(scan, scanCopy));
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
index 02476ad1b96..57d7a7a0386 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -132,6 +132,7 @@ public class TestProtobufUtil {
getBuilder.setMaxVersions(1);
getBuilder.setCacheBlocks(true);
getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
+ getBuilder.setQueryMetricsEnabled(false);
Get get = ProtobufUtil.toGet(proto);
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
}
@@ -260,6 +261,7 @@ public class TestProtobufUtil {
scanBuilder.setCaching(1024);
scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
scanBuilder.setIncludeStopRow(false);
+ scanBuilder.setQueryMetricsEnabled(false);
ClientProtos.Scan expectedProto = scanBuilder.build();
ClientProtos.Scan actualProto =
ProtobufUtil.toScan(ProtobufUtil.toScan(expectedProto));
diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
index 8a2988abf4a..874df527fc8 100644
--- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
@@ -90,6 +90,7 @@ message Get {
optional Consistency consistency = 12 [default = STRONG];
repeated ColumnFamilyTimeRange cf_time_range = 13;
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to
load_column_families_on_demand. */
+ optional bool query_metrics_enabled = 15 [default = false];
}
message Result {
@@ -117,6 +118,9 @@ message Result {
// to form a complete result. The equivalent flag in o.a.h.h.client.Result is
// mayHaveMoreCellsInRow.
optional bool partial = 5 [default = false];
+
+ // Server side metrics about the result
+ optional QueryMetrics metrics = 6;
}
/**
@@ -145,6 +149,7 @@ message Condition {
optional Comparator comparator = 5;
optional TimeRange time_range = 6;
optional Filter filter = 7;
+ optional bool queryMetricsEnabled = 8;
}
@@ -233,6 +238,7 @@ message MutateResponse {
// used for mutate to indicate processed only
optional bool processed = 2;
+ optional QueryMetrics metrics = 3;
}
/**
@@ -274,6 +280,7 @@ message Scan {
}
optional ReadType readType = 23 [default = DEFAULT];
optional bool need_cursor_result = 24 [default = false];
+ optional bool query_metrics_enabled = 25 [default = false];
}
/**
@@ -366,6 +373,9 @@ message ScanResponse {
// If the Scan need cursor, return the row key we are scanning in heartbeat
message.
// If the Scan doesn't need a cursor, don't set this field to reduce network
IO.
optional Cursor cursor = 12;
+
+ // List of QueryMetrics that maps 1:1 to the results in the response based
on index
+ repeated QueryMetrics query_metrics = 13;
}
/**
@@ -458,6 +468,13 @@ message RegionAction {
optional Condition condition = 4;
}
+/*
+* Statistics about the Result's server-side metrics
+*/
+message QueryMetrics {
+ optional uint64 block_bytes_scanned = 1;
+}
+
/*
* Statistics about the current load on the region
*/
@@ -491,6 +508,7 @@ message ResultOrException {
optional CoprocessorServiceResult service_result = 4;
// current load on the region
optional RegionLoadStats loadStats = 5 [deprecated=true];
+ optional QueryMetrics metrics = 6;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c79bdcab821..3ff1227db5f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
@@ -5129,7 +5130,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
// we'll get the latest on this row.
boolean matches = false;
long cellTs = 0;
- try (RegionScanner scanner = getScanner(new Scan(get))) {
+ QueryMetrics metrics = null;
+ try (RegionScannerImpl scanner = getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<ExtendedCell> result = new ArrayList<>(1);
@@ -5154,6 +5156,9 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
matches = matches(op, compareResult);
}
}
+ if (checkAndMutate.isQueryMetricsEnabled()) {
+ metrics = new
QueryMetrics(scanner.getContext().getBlockSizeProgress());
+ }
}
// If matches, perform the mutation or the rowMutations
@@ -5190,10 +5195,10 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
r = mutateRow(rowMutations, nonceGroup, nonce);
}
this.checkAndMutateChecksPassed.increment();
- return new CheckAndMutateResult(true, r);
+ return new CheckAndMutateResult(true, r).setMetrics(metrics);
}
this.checkAndMutateChecksFailed.increment();
- return new CheckAndMutateResult(false, null);
+ return new CheckAndMutateResult(false, null).setMetrics(metrics);
} finally {
rowLock.release();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 53b7e5ede37..5bc071eaa66 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.QueryMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
@@ -2567,6 +2568,7 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}
RegionScannerImpl scanner = null;
+ long blockBytesScannedBefore = context.getBlockBytesScanned();
try {
scanner = region.getScanner(scan);
scanner.next(results);
@@ -2594,7 +2596,13 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
}
region.metricsUpdateForGet();
- return Result.create(results, get.isCheckExistenceOnly() ?
!results.isEmpty() : null, stale);
+ Result r =
+ Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() :
null, stale);
+ if (get.isQueryMetricsEnabled()) {
+ long blockBytesScanned = context.getBlockBytesScanned() -
blockBytesScannedBefore;
+ r.setMetrics(new QueryMetrics(blockBytesScanned));
+ }
+ return r;
}
private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws
ServiceException {
@@ -2795,6 +2803,12 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
if (result.getResult() != null) {
resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
}
+
+ if (result.getMetrics() != null) {
+ resultOrExceptionOrBuilder
+
.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
+ }
+
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
} else {
CheckAndMutateResult result = checkAndMutate(region,
regionAction.getActionList(),
@@ -2962,6 +2976,9 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
if (clientCellBlockSupported) {
addSize(context, result.getResult());
}
+ if (result.getMetrics() != null) {
+ builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
+ }
} else {
Result r = null;
Boolean processed = null;
@@ -3376,6 +3393,7 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
+ long blockBytesScannedBefore = 0;
while (numOfResults < maxResults) {
// Reset the batch progress to 0 before every call to
RegionScanner#nextRaw. The
// batch limit is a limit on the number of cells per Result. Thus,
if progress is
@@ -3387,6 +3405,10 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);
+
+ long blockBytesScanned = scannerContext.getBlockSizeProgress() -
blockBytesScannedBefore;
+ blockBytesScannedBefore = scannerContext.getBlockSizeProgress();
+
if (rpcCall == null) {
// When there is no RpcCallContext,copy EC to heap, then the
scanner would close,
// This can be an EXPENSIVE call. It may make an extra copy from
offheap to onheap
@@ -3425,6 +3447,12 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
}
boolean mayHaveMoreCellsInRow =
scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale,
mayHaveMoreCellsInRow);
+
+ if (request.getScan().getQueryMetricsEnabled()) {
+ builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder()
+ .setBlockBytesScanned(blockBytesScanned).build());
+ }
+
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 36acf678eba..19b54213a5d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -147,6 +147,10 @@ class RegionScannerImpl implements RegionScanner, Shipper,
RpcCallback {
initializeScanners(scan, additionalScanners);
}
+ public ScannerContext getContext() {
+ return defaultScannerContext;
+ }
+
private void initializeScanners(Scan scan, List<KeyValueScanner>
additionalScanners)
throws IOException {
// Here we separate all scanners into two lists - scanner that provide
data required
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java
new file mode 100644
index 00000000000..56f36896c2c
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableQueryMetrics {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableQueryMetrics.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static final TableName TABLE_NAME =
TableName.valueOf("ResultMetrics");
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static final byte[] ROW_1 = Bytes.toBytes("zzz1");
+ private static final byte[] ROW_2 = Bytes.toBytes("zzz2");
+ private static final byte[] ROW_3 = Bytes.toBytes("zzz3");
+
+ private static AsyncConnection CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(3);
+ // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
+ // scan are forced to hit all the regions.
+ try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(ROW_1).addColumn(CF, CQ, VALUE),
+ new Put(ROW_2).addColumn(CF, CQ, VALUE), new Put(ROW_3).addColumn(CF,
CQ, VALUE)));
+ }
+ CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ CONN.getAdmin().flush(TABLE_NAME).join();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void itTestsGets() throws Exception {
+ // Test a single Get
+ Get g1 = new Get(ROW_1);
+ g1.setQueryMetricsEnabled(true);
+
+ long bbs = getClusterBlockBytesScanned();
+ Result result = CONN.getTable(TABLE_NAME).get(g1).get();
+ bbs += result.getMetrics().getBlockBytesScanned();
+ Assert.assertNotNull(result.getMetrics());
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
+
+ // Test multigets
+ Get g2 = new Get(ROW_2);
+ g2.setQueryMetricsEnabled(true);
+
+ Get g3 = new Get(ROW_3);
+ g3.setQueryMetricsEnabled(true);
+
+ List<CompletableFuture<Result>> futures =
CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3));
+
+ for (CompletableFuture<Result> future : futures) {
+ result = future.join();
+ Assert.assertNotNull(result.getMetrics());
+ bbs += result.getMetrics().getBlockBytesScanned();
+ }
+
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
+ }
+
+ @Test
+ public void itTestsDefaultGetNoMetrics() throws Exception {
+ // Test a single Get
+ Get g1 = new Get(ROW_1);
+
+ Result result = CONN.getTable(TABLE_NAME).get(g1).get();
+ Assert.assertNull(result.getMetrics());
+
+ // Test multigets
+ Get g2 = new Get(ROW_2);
+ Get g3 = new Get(ROW_3);
+ List<CompletableFuture<Result>> futures =
CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3));
+ futures.forEach(f -> Assert.assertNull(f.join().getMetrics()));
+
+ }
+
+ @Test
+ public void itTestsScans() {
+ Scan scan = new Scan();
+ scan.setQueryMetricsEnabled(true);
+
+ long bbs = getClusterBlockBytesScanned();
+ try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) {
+ for (Result result : scanner) {
+ Assert.assertNotNull(result.getMetrics());
+ bbs += result.getMetrics().getBlockBytesScanned();
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs);
+ }
+ }
+ }
+
+ @Test
+ public void itTestsDefaultScanNoMetrics() {
+ Scan scan = new Scan();
+
+ try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) {
+ for (Result result : scanner) {
+ Assert.assertNull(result.getMetrics());
+ }
+ }
+ }
+
+ @Test
+ public void itTestsAtomicOperations() {
+ CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ,
VALUE)
+ .queryMetricsEnabled(true).build(new Put(ROW_1).addColumn(CF, CQ,
VALUE));
+
+ long bbs = getClusterBlockBytesScanned();
+ CheckAndMutateResult result =
CONN.getTable(TABLE_NAME).checkAndMutate(cam).join();
+ QueryMetrics metrics = result.getMetrics();
+
+ Assert.assertNotNull(metrics);
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs +
metrics.getBlockBytesScanned());
+
+ bbs = getClusterBlockBytesScanned();
+ List<CheckAndMutate> batch = new ArrayList<>();
+ batch.add(cam);
+
batch.add(CheckAndMutate.newBuilder(ROW_2).queryMetricsEnabled(true).ifEquals(CF,
CQ, VALUE)
+ .build(new Put(ROW_2).addColumn(CF, CQ, VALUE)));
+
batch.add(CheckAndMutate.newBuilder(ROW_3).queryMetricsEnabled(true).ifEquals(CF,
CQ, VALUE)
+ .build(new Put(ROW_3).addColumn(CF, CQ, VALUE)));
+
+ List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join();
+ long totalBbs = res.stream()
+ .mapToLong(r -> ((CheckAndMutateResult)
r).getMetrics().getBlockBytesScanned()).sum();
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs);
+
+ bbs = getClusterBlockBytesScanned();
+
+ // flush to force fetch from disk
+ CONN.getAdmin().flush(TABLE_NAME).join();
+ List<CompletableFuture<Object>> futures =
CONN.getTable(TABLE_NAME).batch(batch);
+
+ totalBbs = futures.stream().map(CompletableFuture::join)
+ .mapToLong(r -> ((CheckAndMutateResult)
r).getMetrics().getBlockBytesScanned()).sum();
+ Assert.assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs);
+ }
+
+ @Test
+ public void itTestsDefaultAtomicOperations() {
+ CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ,
VALUE)
+ .build(new Put(ROW_1).addColumn(CF, CQ, VALUE));
+
+ CheckAndMutateResult result =
CONN.getTable(TABLE_NAME).checkAndMutate(cam).join();
+ QueryMetrics metrics = result.getMetrics();
+
+ Assert.assertNull(metrics);
+
+ List<CheckAndMutate> batch = new ArrayList<>();
+ batch.add(cam);
+ batch.add(CheckAndMutate.newBuilder(ROW_2).ifEquals(CF, CQ, VALUE)
+ .build(new Put(ROW_2).addColumn(CF, CQ, VALUE)));
+ batch.add(CheckAndMutate.newBuilder(ROW_3).ifEquals(CF, CQ, VALUE)
+ .build(new Put(ROW_3).addColumn(CF, CQ, VALUE)));
+
+ List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join();
+ for (Object r : res) {
+ Assert.assertNull(((CheckAndMutateResult) r).getMetrics());
+ }
+
+ // flush to force fetch from disk
+ CONN.getAdmin().flush(TABLE_NAME).join();
+ List<CompletableFuture<Object>> futures =
CONN.getTable(TABLE_NAME).batch(batch);
+
+ for (CompletableFuture<Object> future : futures) {
+ Object r = future.join();
+ Assert.assertNull(((CheckAndMutateResult) r).getMetrics());
+ }
+ }
+
+ private static long getClusterBlockBytesScanned() {
+ long bbs = 0L;
+
+ for (JVMClusterUtil.RegionServerThread rs :
UTIL.getHBaseCluster().getRegionServerThreads()) {
+ MetricsRegionServer metrics = rs.getRegionServer().getMetrics();
+ MetricsRegionServerSourceImpl source =
+ (MetricsRegionServerSourceImpl) metrics.getMetricsSource();
+
+ bbs += source.getMetricsRegistry()
+ .getCounter(MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY,
0L).value();
+ }
+
+ return bbs;
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index 37422c1f1a0..1497f359080 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -236,7 +236,7 @@ public class TestMalformedCellFromClient {
ClientProtos.Action.Builder actionBuilder =
ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder =
ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(),
FAMILY, null,
- CompareOperator.EQUAL, new byte[10], null, null);
+ CompareOperator.EQUAL, new byte[10], null, null, false);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {