This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch request-processing-info in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 74d4dcb9da103ea506cb0f142daf665f3b9b24ab Author: Subbu Subramaniam <[email protected]> AuthorDate: Tue Dec 11 20:15:31 2018 -0800 Adding a class to get statistics about a request (query) processed by Pinot This will enable us to publish such statistics about each request to a stream. This stream can eventually be consumed by Pinot, and data from the stream analyzed on a per-table basis. --- .../pinot/broker/api/RequestStatistics.java | 171 +++++++++++++++++++++ .../broker/api/resources/PinotClientRequest.java | 5 +- .../requesthandler/BaseBrokerRequestHandler.java | 23 ++- .../requesthandler/BrokerRequestHandler.java | 4 +- 4 files changed, 199 insertions(+), 4 deletions(-) diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java new file mode 100644 index 0000000..02a0ba7 --- /dev/null +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2014-2015 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linkedin.pinot.broker.api; + +import com.linkedin.pinot.common.request.BrokerRequest; +import com.linkedin.pinot.common.request.QueryType; +import com.linkedin.pinot.common.response.BrokerResponse; + + +/** + * A class to hold the details regarding a request and the statistics. + * This object can be used to publish the query processing statistics to a stream for + * post-processing at a finer level than metrics. + */ +public class RequestStatistics { + private int _errorCode = 0; + private String _pql; + private String _tableName = "NotYetParsed"; + private long _processingTimeMillis = -1; + + private long _totalDocs; + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + private long _numSegmentsQueried; + private long _numSegmentsProcessed; + private long _numSegmentsMatched; + private int _numServersQueried; + private int _numServersResponded; + private boolean _isNumGroupsLimitReached; + private int _numExceptions; + private boolean _isGroupBy; + private boolean _isAggregation; + + public enum FanoutType { + OFFLINE, + REALTIME, + HYBRID + } + + private FanoutType _fanoutType; + + public RequestStatistics() { + } + + public void setErrorCode(int errorCode) { + _errorCode = errorCode; + } + + public void setPql(String pql) { + _pql = pql; + } + + public void setTableName(String tableName) { + _tableName = tableName; + } + + public void setQueryProcessingTime(long processingTimeMillis) { + _processingTimeMillis = processingTimeMillis; + } + + public void setStatistics(BrokerResponse brokerResponse) { + _totalDocs = brokerResponse.getTotalDocs(); + _numDocsScanned = brokerResponse.getNumDocsScanned(); + _numEntriesScannedInFilter = brokerResponse.getNumEntriesScannedInFilter(); + _numEntriesScannedPostFilter = brokerResponse.getNumEntriesScannedPostFilter(); + _numSegmentsQueried = brokerResponse.getNumSegmentsQueried(); + _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed(); + _numSegmentsMatched = brokerResponse.getNumSegmentsMatched(); + _numServersQueried = brokerResponse.getNumServersQueried(); + _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed(); + _numServersResponded = brokerResponse.getNumServersResponded(); + _isNumGroupsLimitReached = brokerResponse.isNumGroupsLimitReached(); + _numExceptions = brokerResponse.getExceptionsSize(); + } + + public void setStatistics(BrokerRequest brokerRequest) { + _isGroupBy = brokerRequest.isSetGroupBy(); + _isAggregation = brokerRequest.isSetAggregationsInfo(); + } + + public void setFanoutType(FanoutType fanoutType) { + _fanoutType = fanoutType; + } + + public FanoutType getFanoutType() { + return _fanoutType; + } + + public boolean isAggregation() { + return _isAggregation; + } + + public boolean isGroupBy() { + return _isGroupBy; + } + + public int getErrorCode() { + return _errorCode; + } + + public String getPql() { + return _pql; + } + + public String getTableName() { + return _tableName; + } + + public long getProcessingTimeMillis() { + return _processingTimeMillis; + } + + public long getTotalDocs() { + return _totalDocs; + } + + public long getNumDocsScanned() { + return _numDocsScanned; + } + + public long getNumEntriesScannedInFilter() { + return _numEntriesScannedInFilter; + } + + public long getNumEntriesScannedPostFilter() { + return _numEntriesScannedPostFilter; + } + + public long getNumSegmentsQueried() { + return _numSegmentsQueried; + } + + public long getNumSegmentsProcessed() { + return _numSegmentsProcessed; + } + + public long getNumSegmentsMatched() { + return _numSegmentsMatched; + } + + public int getNumServersQueried() { + return _numServersQueried; + } + + public int getNumServersResponded() { + return _numServersResponded; + } + + public boolean isNumGroupsLimitReached() { + return _isNumGroupsLimitReached; + } + + public int getNumExceptions() { + return _numExceptions; + } +} diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java index 5b2bc6e..eccd43a 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java @@ -15,6 +15,7 @@ */ package com.linkedin.pinot.broker.api.resources; +import com.linkedin.pinot.broker.api.RequestStatistics; import com.linkedin.pinot.broker.requesthandler.BrokerRequestHandler; import com.linkedin.pinot.common.metrics.BrokerMeter; import com.linkedin.pinot.common.metrics.BrokerMetrics; @@ -74,7 +75,7 @@ public class PinotClientRequest { if (debugOptions != null) { requestJson.put(DEBUG_OPTIONS, debugOptions); } - BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null); + BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics()); return brokerResponse.toJsonString(); } catch (Exception e) { LOGGER.error("Caught exception while processing GET request", e); @@ -94,7 +95,7 @@ public class PinotClientRequest { public String processQueryPost(String query) { try { JSONObject requestJson = new JSONObject(query); - BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null); + BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics()); return brokerResponse.toJsonString(); } catch (Exception e) { LOGGER.error("Caught exception while processing GET request", e); diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index c06c3ff..456a7bf 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -16,6 +16,7 @@ package com.linkedin.pinot.broker.requesthandler; import com.google.common.base.Splitter; +import com.linkedin.pinot.broker.api.RequestStatistics; import com.linkedin.pinot.broker.api.RequesterIdentity; import com.linkedin.pinot.broker.broker.AccessControlFactory; import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager; @@ -105,11 +106,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } @Override - public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity) + public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity, + RequestStatistics requestStatistics) throws Exception { long requestId = _requestIdGenerator.incrementAndGet(); String query = request.getString(PQL); LOGGER.debug("Query string for request {}: {}", requestId, query); + requestStatistics.setPql(query); // Compile the request long compilationStartTimeNs = System.nanoTime(); @@ -119,10 +122,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } catch (Exception e) { LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage()); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); + requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e)); } String tableName = brokerRequest.getQuerySource().getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableName); + requestStatistics.setTableName(rawTableName); + requestStatistics.setStatistics(brokerRequest); long compilationEndTimeNs = System.nanoTime(); _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, compilationEndTimeNs - compilationStartTimeNs); @@ -133,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (!hasAccess) { _brokerMetrics.addMeteredTableValue(brokerRequest.getQuerySource().getTableName(), BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); } _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, @@ -166,6 +173,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if ((offlineTableName == null) && (realtimeTableName == null)) { // No table matches the request LOGGER.info("No table matches for request {}: {}", requestId, query); + requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); return BrokerResponseNative.NO_TABLE_RESULT; } @@ -175,6 +183,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { String errorMessage = String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query); LOGGER.info(errorMessage); + requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } @@ -184,6 +193,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { validateRequest(brokerRequest); } catch (Exception e) { LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); + requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); } @@ -228,6 +238,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { Map<String, List<String>> offlineRoutingTable = null; Map<String, List<String>> realtimeRoutingTable = null; if (offlineBrokerRequest != null) { + if (realtimeBrokerRequest == null) { + requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE); + } offlineRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(offlineBrokerRequest)); if (offlineRoutingTable.isEmpty()) { LOGGER.debug("No OFFLINE server found for request {}: {}", requestId, query); @@ -236,6 +249,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } if (realtimeBrokerRequest != null) { + if (offlineBrokerRequest == null) { + requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME); + } realtimeRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(realtimeBrokerRequest)); if (realtimeRoutingTable.isEmpty()) { LOGGER.debug("No REALTIME server found for request {}: {}", requestId, query); @@ -243,6 +259,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { realtimeRoutingTable = null; } } + if (offlineBrokerRequest != null && realtimeBrokerRequest != null) { + requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID); + } if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { LOGGER.info("No server found for request {}: {}", requestId, query); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1); @@ -269,6 +288,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Set total query processing time long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs); brokerResponse.setTimeUsedMs(totalTimeMs); + requestStatistics.setQueryProcessingTime(totalTimeMs); + requestStatistics.setStatistics(brokerResponse); LOGGER.debug("Broker Response: {}", brokerResponse); diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java index 4513741..5cc2bd9 100644 --- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -15,6 +15,7 @@ */ package com.linkedin.pinot.broker.requesthandler; +import com.linkedin.pinot.broker.api.RequestStatistics; import com.linkedin.pinot.broker.api.RequesterIdentity; import com.linkedin.pinot.common.response.BrokerResponse; import javax.annotation.Nullable; @@ -29,5 +30,6 @@ public interface BrokerRequestHandler { void shutDown(); - BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity) throws Exception; + BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity, + RequestStatistics requestStatistics) throws Exception; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
