Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 92d8cf271 -> b553f96a2
PHOENIX-3248 Enable HBase server-side scan metrics to be returned to client and surfaced through metrics (Karan Mehta) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b553f96a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b553f96a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b553f96a Branch: refs/heads/4.x-HBase-1.1 Commit: b553f96a21b603d02083cd1c32fca95036a80bb5 Parents: 92d8cf2 Author: Samarth Jain <[email protected]> Authored: Fri May 26 09:51:57 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Fri May 26 09:51:57 2017 -0700 ---------------------------------------------------------------------- .../DelayedTableResultIteratorFactory.java | 18 +++-- .../phoenix/monitoring/PhoenixMetricsIT.java | 35 ++++----- .../phoenix/iterate/ChunkedResultIterator.java | 11 +-- .../DefaultTableResultIteratorFactory.java | 12 ++-- .../phoenix/iterate/ParallelIterators.java | 11 ++- .../phoenix/iterate/ScanningResultIterator.java | 76 +++++++++++++------- .../apache/phoenix/iterate/SerialIterators.java | 13 +++- .../phoenix/iterate/TableResultIterator.java | 11 +-- .../iterate/TableResultIteratorFactory.java | 6 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 16 +++-- .../phoenix/monitoring/GlobalClientMetrics.java | 7 +- .../apache/phoenix/monitoring/MetricType.java | 17 ++++- .../phoenix/monitoring/ReadMetricQueue.java | 4 ++ .../hive/mapreduce/PhoenixRecordReader.java | 11 +-- 14 files changed, 158 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java index 55bed91..5e13982 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java @@ -26,7 +26,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -39,14 +39,18 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac } @Override - public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, + Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, + QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder, + renewLeaseThreshold, plan, scanGrouper); } - + private class DelayedTableResultIterator extends TableResultIterator { - public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, + ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, + ParallelScanGrouper scanGrouper) throws SQLException { + super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 2838f04..69ad1ff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -76,12 +76,14 @@ import com.google.common.collect.Sets; public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { - private static final List<String> mutationMetricsToSkip = Lists - .newArrayList(MetricType.MUTATION_COMMIT_TIME.name()); - private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(), - MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name()); + private static final List<String> mutationMetricsToSkip = + Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME.name()); + private static final List<String> readMetricsToSkip = + Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(), + MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name(), + MetricType.COUNT_MILLS_BETWEEN_NEXTS.name()); private static final String CUSTOM_URL_STRING = "SESSION"; - private static final AtomicInteger numConnections = new AtomicInteger(0); + private static final AtomicInteger numConnections = new AtomicInteger(0); @BeforeClass public static void doSetup() throws Exception { @@ -230,7 +232,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { resultSetBeingTested.close(); Set<String> expectedTableNames = Sets.newHashSet(tableName); assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks), - resultSetBeingTested, expectedTableNames); + resultSetBeingTested, expectedTableNames); } @Test @@ -617,7 +619,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip); } } - + @Test public void testOpenConnectionsCounter() throws Exception { long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(); @@ -626,7 +628,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } assertEquals(numOpenConnections, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue()); } - + private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName) throws SQLException { String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; @@ -684,20 +686,14 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { String tableName = entry.getKey(); expectedTableNames.remove(tableName); Map<String, Long> metricValues = entry.getValue(); - boolean scanMetricsPresent = false; boolean taskCounterMetricsPresent = false; boolean taskExecutionTimeMetricsPresent = false; boolean memoryMetricsPresent = false; for (Entry<String, Long> pair : metricValues.entrySet()) { String metricName = pair.getKey(); long metricValue = pair.getValue(); - long n = numRows.get(counter); long numTask = numExpectedTasks.get(counter); - if (metricName.equals(SCAN_BYTES.name())) { - // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read - assertEquals(n, metricValue); - scanMetricsPresent = true; - } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) { + if (metricName.equals(TASK_EXECUTED_COUNTER.name())) { assertEquals(numTask, metricValue); taskCounterMetricsPresent = true; } else if (metricName.equals(TASK_EXECUTION_TIME.name())) { @@ -709,7 +705,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } } counter++; - assertTrue(scanMetricsPresent); assertTrue(taskCounterMetricsPresent); assertTrue(taskExecutionTimeMetricsPresent); assertTrue(memoryMetricsPresent); @@ -822,7 +817,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } } } - + @Test public void testGetConnectionsForSameUrlConcurrently() throws Exception { // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver @@ -940,7 +935,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { exec.shutdownNow(); } } - + @Test public void testGetConnectionsWithDifferentJDBCParamsConcurrently() throws Exception { DriverManager.registerDriver(PhoenixDriver.INSTANCE); @@ -977,9 +972,9 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { c.close(); } catch (Exception ignore) {} } - } + } } - + private static class GetConnectionCallable implements Callable<Connection> { private final String url; GetConnectionCallable(String url) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index a12d40c..8595fd4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -19,7 +19,6 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; -import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import java.sql.SQLException; import java.util.List; @@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; @@ -144,12 +145,14 @@ public class ChunkedResultIterator implements PeekingResultIterator { } if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); String tableName = tableRef.getTable().getPhysicalName().getString(); + ReadMetricQueue readMetrics = context.getReadMetricsQueue(); + ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, + readMetrics.isRequestMetricsEnabled()); long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(new TableResultIterator(mutationState, scan, - context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), - renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()), - chunkSize); + scanMetricsHolder, renewLeaseThreshold, plan, + DefaultParallelScanGrouper.getInstance()), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan); } return resultIterator; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java index b720b56..976b839 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -22,15 +22,17 @@ import java.sql.SQLException; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.TableRef; public class DefaultTableResultIteratorFactory implements TableResultIteratorFactory { - @Override - public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + @Override + public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, + Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, + QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, + plan, scanGrouper); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 8c9b689..f0360e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.job.JobManager.JobCallable; -import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.LogUtil; @@ -97,11 +97,16 @@ public class ParallelIterators extends BaseResultIterators { context.getOverallQueryMetrics().updateNumParallelScans(numScans); GLOBAL_NUM_PARALLEL_SCANS.update(numScans); final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); + boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); for (final ScanLocator scanLocation : scanLocations) { final Scan scan = scanLocation.getScan(); - final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); + final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName, + scan, isRequestMetricsEnabled); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); - final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + final TableResultIterator tableResultItr = + context.getConnection().getTableResultIteratorFactory().newIterator( + mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan, + scanGrouper); context.getConnection().addIteratorForLeaseRenewal(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 7f865ed..8ee00e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -22,33 +22,74 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTE import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Map; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.phoenix.monitoring.CombinableMetric; -import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; -import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; public class ScanningResultIterator implements ResultIterator { private final ResultScanner scanner; - private final CombinableMetric scanMetrics; + private final Scan scan; + private final ScanMetricsHolder scanMetricsHolder; + boolean scanMetricsUpdated; + boolean scanMetricsEnabled; - public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) { + // These metric names are how HBase refers them + // Since HBase stores these strings as static final, we are using the same here + static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS"; + static final String REMOTE_RPC_CALLS_METRIC_NAME = "REMOTE_RPC_CALLS"; + static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS"; + static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION"; + static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS"; + static final String BYTES_IN_REMOTE_RESULTS_METRIC_NAME = "BYTES_IN_REMOTE_RESULTS"; + static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; + static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; + static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES"; + static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED"; + static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED"; + static final String GLOBAL_BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS"; + + public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder) { this.scanner = scanner; - this.scanMetrics = scanMetrics; + this.scan = scan; + this.scanMetricsHolder = scanMetricsHolder; + scanMetricsUpdated = false; + scanMetricsEnabled = scan.isScanMetricsEnabled(); } @Override public void close() throws SQLException { + getScanMetrics(); scanner.close(); } + private void getScanMetrics() { + + if (!scanMetricsUpdated && scanMetricsEnabled) { + Map<String, Long> scanMetricsMap = scan.getScanMetrics().getMetricsMap(); + scanMetricsHolder.getCountOfRPCcalls().change(scanMetricsMap.get(RPC_CALLS_METRIC_NAME)); + scanMetricsHolder.getCountOfRemoteRPCcalls().change(scanMetricsMap.get(REMOTE_RPC_CALLS_METRIC_NAME)); + scanMetricsHolder.getSumOfMillisSecBetweenNexts().change(scanMetricsMap.get(MILLIS_BETWEEN_NEXTS_METRIC_NAME)); + scanMetricsHolder.getCountOfNSRE().change(scanMetricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME)); + scanMetricsHolder.getCountOfBytesInResults().change(scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME)); + scanMetricsHolder.getCountOfBytesInRemoteResults().change(scanMetricsMap.get(BYTES_IN_REMOTE_RESULTS_METRIC_NAME)); + scanMetricsHolder.getCountOfRegions().change(scanMetricsMap.get(REGIONS_SCANNED_METRIC_NAME)); + scanMetricsHolder.getCountOfRPCRetries().change(scanMetricsMap.get(RPC_RETRIES_METRIC_NAME)); + scanMetricsHolder.getCountOfRemoteRPCRetries().change(scanMetricsMap.get(REMOTE_RPC_RETRIES_METRIC_NAME)); + scanMetricsHolder.getCountOfRowsScanned().change(scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); + scanMetricsHolder.getCountOfRowsFiltered().change(scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)); + + GLOBAL_SCAN_BYTES.update(scanMetricsMap.get(GLOBAL_BYTES_IN_RESULTS_METRIC_NAME)); + scanMetricsUpdated = true; + } + + } + @Override public Tuple next() throws SQLException { try { @@ -57,7 +98,6 @@ public class ScanningResultIterator implements ResultIterator { close(); // Free up resources early return null; } - calculateScanSize(result); // TODO: use ResultTuple.setResult(result)? // Need to create a new one if holding on to it (i.e. OrderedResultIterator) return new ResultTuple(result); @@ -75,22 +115,6 @@ public class ScanningResultIterator implements ResultIterator { return "ScanningResultIterator [scanner=" + scanner + "]"; } - private void calculateScanSize(Result result) { - if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) { - if (result != null) { - Cell[] cells = result.rawCells(); - long scanResultSize = 0; - for (Cell cell : cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - scanResultSize += kv.heapSize(); - } - scanMetrics.change(scanResultSize); - GLOBAL_SCAN_BYTES.update(scanResultSize); - } - } - } - - public ResultScanner getScanner() { return scanner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index d8f7f40..eb0c949 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; - import java.sql.SQLException; import java.util.Collections; import java.util.List; @@ -33,6 +31,8 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.QueryConstants; @@ -168,12 +168,19 @@ public class SerialIterators extends BaseResultIterators { if (index >= scans.size()) { return EMPTY_ITERATOR; } + ReadMetricQueue readMetrics = context.getReadMetricsQueue(); + boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); while (index < scans.size()) { Scan currentScan = scans.get(index++); if (remainingOffset != null) { currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset)); } - TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper); + ScanMetricsHolder scanMetricsHolder = + ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan, + isRequestMetricsEnabled); + TableResultIterator itr = + new TableResultIterator(mutationState, currentScan, scanMetricsHolder, + renewLeaseThreshold, plan, scanGrouper); PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan); Tuple tuple; if ((tuple = peekingItr.peek()) == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index c6fcc1d..f854996 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; @@ -64,7 +65,7 @@ import com.google.common.annotations.VisibleForTesting; public class TableResultIterator implements ResultIterator { private final Scan scan; private final HTableInterface htable; - private final CombinableMetric scanMetrics; + private final ScanMetricsHolder scanMetricsHolder; private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR; private final long renewLeaseThreshold; private final QueryPlan plan; @@ -85,7 +86,7 @@ public class TableResultIterator implements ResultIterator { @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { - this.scanMetrics = null; + this.scanMetricsHolder = null; this.renewLeaseThreshold = 0; this.htable = null; this.scan = null; @@ -97,10 +98,10 @@ public class TableResultIterator implements ResultIterator { RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED }; - public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, + public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { this.scan = scan; - this.scanMetrics = scanMetrics; + this.scanMetricsHolder = scanMetricsHolder; this.plan = plan; PTable table = plan.getTableRef().getTable(); htable = mutationState.getHTable(table); @@ -186,7 +187,7 @@ public class TableResultIterator implements ResultIterator { if (delegate == UNINITIALIZED_SCANNER) { try { this.scanIterator = - new ScanningResultIterator(htable.getScanner(scan), scanMetrics); + new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder); } catch (IOException e) { Closeables.closeQuietly(htable); throw ServerUtil.parseServerException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java index 8d7b54d..c23e342 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -23,8 +23,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.TableRef; public interface TableResultIteratorFactory { - public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException; + public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, + Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, + QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 17d9b6a..d4d6734 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; - import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -47,13 +45,13 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; +import org.apache.phoenix.query.ConnectionQueryServices; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import org.apache.phoenix.query.ConnectionQueryServices; - /** * {@link RecordReader} implementation that iterates over the the records. */ @@ -119,10 +117,18 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null services.clearTableRegionCache(tableNameBytes); long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); + boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); for (Scan scan : scans) { // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance()); + ScanMetricsHolder scanMetricsHolder = + ScanMetricsHolder.getInstance(readMetrics, tableName, scan, + isRequestMetricsEnabled); + final TableResultIterator tableResultIterator = + new TableResultIterator( + queryPlan.getContext().getConnection().getMutationState(), scan, + scanMetricsHolder, renewScannerLeaseThreshold, queryPlan, + MapReduceParallelScanGrouper.getInstance()); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index b5f9422..6ba677a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -18,8 +18,6 @@ package org.apache.phoenix.monitoring; import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER; -import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; -import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE; @@ -27,10 +25,11 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES; import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME; import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER; -import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER; @@ -41,6 +40,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER; + import java.util.ArrayList; import java.util.Collection; import java.util.List; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 7b21de5..a18d4ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -46,8 +46,19 @@ public enum MetricType { HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"), PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " + "because there are already too many to that target cluster."), - PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not."); - + PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not."), + COUNT_RPC_CALLS("Number of RPC calls"), + COUNT_REMOTE_RPC_CALLS("Number of remote RPC calls"), + COUNT_MILLS_BETWEEN_NEXTS("Sum of milliseconds between sequential next calls"), + COUNT_NOT_SERVING_REGION_EXCEPTION("Number of NotServingRegionException caught"), + COUNT_BYTES_REGION_SERVER_RESULTS("Number of bytes in Result objects from region servers"), + COUNT_BYTES_IN_REMOTE_RESULTS("Number of bytes in Result objects from remote region servers"), + COUNT_SCANNED_REGIONS("Number of regions scanned"), + COUNT_RPC_RETRIES("Number of RPC retries"), + COUNT_REMOTE_RPC_RETRIES("Number of remote RPC retries"), + COUNT_ROWS_SCANNED("Number of rows scanned"), + COUNT_ROWS_FILTERED("Number of rows filtered"); + private final String description; private MetricType(String description) { @@ -56,6 +67,6 @@ public enum MetricType { public String description() { return description; - } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java index e6c6be2..0e985ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java @@ -177,4 +177,8 @@ public class ReadMetricQueue { return q; } + public boolean isRequestMetricsEnabled() { + return isRequestMetricsEnabled; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b553f96a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java index ca27686..c10c4d1 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.hive.mapreduce; -import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; - import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -49,6 +47,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.ScanMetricsHolder; import com.google.common.base.Throwables; import com.google.common.collect.Lists; @@ -113,12 +112,14 @@ public class PhoenixRecordReader<T extends DBWritable> implements String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); long renewScannerLeaseThreshold = queryPlan.getContext().getConnection() .getQueryServices().getRenewLeaseThresholdMilliSeconds(); + boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); for (Scan scan : scans) { scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes .toBytes(true)); - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan - .getContext().getConnection().getMutationState(), scan, - readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance() ); + ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, isRequestMetricsEnabled); + final TableResultIterator tableResultIterator = new TableResultIterator( + queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder, + renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance()); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap (tableResultIterator);
