This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 65aa0ec  Adding a class to get statistics about a request (query) 
processed by… (#3614)
65aa0ec is described below

commit 65aa0ecbd166d91a04bc660d538eecf69c60deca
Author: Subbu Subramaniam <[email protected]>
AuthorDate: Fri Dec 14 13:31:27 2018 -0800

    Adding a class to get statistics about a request (query) processed by… 
(#3614)
    
    * Adding a class to get statistics about a request (query) processed by 
Pinot
    
    This will enable us to publish such statistics about each request to a 
stream.
    This stream can eventually be consumed by Pinot, and data from the stream
    analyzed on a per-table basis.
    
    * Addressed review comments
    
    Removed all query-description-related parameters from RequestStatistics.
    Added other timing related parameters that we can add now.
    
    * Addresed review comments
---
 .../pinot/broker/api/RequestStatistics.java        | 175 +++++++++++++++++++++
 .../broker/api/resources/PinotClientRequest.java   |   5 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |  30 +++-
 .../requesthandler/BrokerRequestHandler.java       |   8 +-
 .../ConnectionPoolBrokerRequestHandler.java        |   8 +-
 .../SingleConnectionBrokerRequestHandler.java      |   8 +-
 6 files changed, 224 insertions(+), 10 deletions(-)

diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
new file mode 100644
index 0000000..da7db2e
--- /dev/null
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
@@ -0,0 +1,175 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.broker.api;
+
+import com.linkedin.pinot.common.response.BrokerResponse;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A class to hold the details regarding a request and the statistics.
+ * This object can be used to publish the query processing statistics to a 
stream for
+ * post-processing at a finer level than metrics.
+ */
+public class RequestStatistics {
+  private int _errorCode = 0;
+  private String _pql;
+  private String _tableName = "NotYetParsed";
+  private long _processingTimeMillis = -1;
+
+  private long _totalDocs;
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private long _numSegmentsQueried;
+  private long _numSegmentsProcessed;
+  private long _numSegmentsMatched;
+  private int _numServersQueried;
+  private int _numServersResponded;
+  private boolean _isNumGroupsLimitReached;
+  private int _numExceptions;
+  private String _brokerId;
+  private long _requestId;
+  private long _requestArrivalTimeMillis;
+  private long _reduceTimeMillis;
+
+  public enum FanoutType {
+    OFFLINE,
+    REALTIME,
+    HYBRID
+  }
+
+  private FanoutType _fanoutType;
+
+  public RequestStatistics() {
+  }
+
+  public void setErrorCode(int errorCode) {
+    _errorCode = errorCode;
+  }
+
+  public void setPql(String pql) {
+    _pql = pql;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  public void setQueryProcessingTime(long processingTimeMillis) {
+    _processingTimeMillis = processingTimeMillis;
+  }
+
+  public void setStatistics(BrokerResponse brokerResponse) {
+    _totalDocs = brokerResponse.getTotalDocs();
+    _numDocsScanned = brokerResponse.getNumDocsScanned();
+    _numEntriesScannedInFilter = brokerResponse.getNumEntriesScannedInFilter();
+    _numEntriesScannedPostFilter = 
brokerResponse.getNumEntriesScannedPostFilter();
+    _numSegmentsQueried = brokerResponse.getNumSegmentsQueried();
+    _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+    _numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
+    _numServersQueried = brokerResponse.getNumServersQueried();
+    _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+    _numServersResponded = brokerResponse.getNumServersResponded();
+    _isNumGroupsLimitReached = brokerResponse.isNumGroupsLimitReached();
+    _numExceptions = brokerResponse.getExceptionsSize();
+  }
+
+  public void setBrokerId(String brokerId) {
+    _brokerId = brokerId;
+  }
+
+  public void setRequestId(long requestId) {
+    _requestId = requestId;
+  }
+
+  public void setRequestArrivalTimeMillis(long requestArrivalTimeMillis) {
+    _requestArrivalTimeMillis = requestArrivalTimeMillis;
+  }
+
+  public void setReduceTimeNanos(long reduceTimeNanos) {
+    _reduceTimeMillis = TimeUnit.MILLISECONDS.convert(reduceTimeNanos, 
TimeUnit.NANOSECONDS);
+  }
+
+  public void setFanoutType(FanoutType fanoutType) {
+    _fanoutType = fanoutType;
+  }
+
+  public FanoutType getFanoutType() {
+    return _fanoutType;
+  }
+
+  public int getErrorCode() {
+    return _errorCode;
+  }
+
+  public String getPql() {
+    return _pql;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public long getProcessingTimeMillis() {
+    return _processingTimeMillis;
+  }
+
+  public long getTotalDocs() {
+    return _totalDocs;
+  }
+
+  public long getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  public long getNumEntriesScannedInFilter() {
+    return _numEntriesScannedInFilter;
+  }
+
+  public long getNumEntriesScannedPostFilter() {
+    return _numEntriesScannedPostFilter;
+  }
+
+  public long getNumSegmentsQueried() {
+    return _numSegmentsQueried;
+  }
+
+  public long getNumSegmentsProcessed() {
+    return _numSegmentsProcessed;
+  }
+
+  public long getNumSegmentsMatched() {
+    return _numSegmentsMatched;
+  }
+
+  public int getNumServersQueried() {
+    return _numServersQueried;
+  }
+
+  public int getNumServersResponded() {
+    return _numServersResponded;
+  }
+
+  public boolean isNumGroupsLimitReached() {
+    return _isNumGroupsLimitReached;
+  }
+
+  public int getNumExceptions() {
+    return _numExceptions;
+  }
+}
diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
index 5b2bc6e..eccd43a 100644
--- 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.api.resources;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.requesthandler.BrokerRequestHandler;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
@@ -74,7 +75,7 @@ public class PinotClientRequest {
       if (debugOptions != null) {
         requestJson.put(DEBUG_OPTIONS, debugOptions);
       }
-      BrokerResponse brokerResponse = 
requestHandler.handleRequest(requestJson, null);
+      BrokerResponse brokerResponse = 
requestHandler.handleRequest(requestJson, null, new RequestStatistics());
       return brokerResponse.toJsonString();
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
@@ -94,7 +95,7 @@ public class PinotClientRequest {
   public String processQueryPost(String query) {
     try {
       JSONObject requestJson = new JSONObject(query);
-      BrokerResponse brokerResponse = 
requestHandler.handleRequest(requestJson, null);
+      BrokerResponse brokerResponse = 
requestHandler.handleRequest(requestJson, null, new RequestStatistics());
       return brokerResponse.toJsonString();
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index c06c3ff..e12afcc 100644
--- 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.requesthandler;
 
 import com.google.common.base.Splitter;
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.api.RequesterIdentity;
 import com.linkedin.pinot.broker.broker.AccessControlFactory;
 import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager;
@@ -103,13 +104,25 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       return "";
     }
   }
-
+  
   @Override
   public BrokerResponse handleRequest(JSONObject request, @Nullable 
RequesterIdentity requesterIdentity)
       throws Exception {
+    return handleRequest(request, requesterIdentity, new RequestStatistics());
+  }
+
+  @Override
+  public BrokerResponse handleRequest(JSONObject request, @Nullable 
RequesterIdentity requesterIdentity,
+      RequestStatistics requestStatistics)
+      throws Exception {
     long requestId = _requestIdGenerator.incrementAndGet();
+    requestStatistics.setBrokerId(_brokerId);
+    requestStatistics.setRequestId(requestId);
+    requestStatistics.setRequestArrivalTimeMillis(System.currentTimeMillis());
+
     String query = request.getString(PQL);
     LOGGER.debug("Query string for request {}: {}", requestId, query);
+    requestStatistics.setPql(query);
 
     // Compile the request
     long compilationStartTimeNs = System.nanoTime();
@@ -119,10 +132,12 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling request {}: {}, {}", 
requestId, query, e.getMessage());
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
+      requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
 e));
     }
     String tableName = brokerRequest.getQuerySource().getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    requestStatistics.setTableName(rawTableName);
     long compilationEndTimeNs = System.nanoTime();
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
         compilationEndTimeNs - compilationStartTimeNs);
@@ -133,6 +148,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (!hasAccess) {
       
_brokerMetrics.addMeteredTableValue(brokerRequest.getQuerySource().getTableName(),
           BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+      requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
       return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
     }
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
@@ -166,6 +182,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if ((offlineTableName == null) && (realtimeTableName == null)) {
       // No table matches the request
       LOGGER.info("No table matches for request {}: {}", requestId, query);
+      
requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 
1);
       return BrokerResponseNative.NO_TABLE_RESULT;
     }
@@ -175,6 +192,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       String errorMessage =
           String.format("Request %d exceeds query quota for table:%s, 
query:%s", requestId, tableName, query);
       LOGGER.info(errorMessage);
+      
requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
     }
@@ -184,6 +202,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       validateRequest(brokerRequest);
     } catch (Exception e) {
       LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
+      
requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
     }
@@ -213,14 +232,17 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       // Hybrid
       offlineBrokerRequest = 
_brokerRequestOptimizer.optimize(getOfflineBrokerRequest(brokerRequest), 
timeColumn);
       realtimeBrokerRequest = 
_brokerRequestOptimizer.optimize(getRealtimeBrokerRequest(brokerRequest), 
timeColumn);
+      requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
     } else if (offlineTableName != null) {
       // OFFLINE only
       brokerRequest.getQuerySource().setTableName(offlineTableName);
       offlineBrokerRequest = _brokerRequestOptimizer.optimize(brokerRequest, 
timeColumn);
+      requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
     } else {
       // REALTIME only
       brokerRequest.getQuerySource().setTableName(realtimeTableName);
       realtimeBrokerRequest = _brokerRequestOptimizer.optimize(brokerRequest, 
timeColumn);
+      requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
     }
 
     // Calculate routing table for the query
@@ -256,7 +278,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     ServerStats serverStats = new ServerStats();
     BrokerResponse brokerResponse =
         processBrokerRequest(requestId, brokerRequest, offlineBrokerRequest, 
offlineRoutingTable, realtimeBrokerRequest,
-            realtimeRoutingTable, remainingTimeMs, serverStats);
+            realtimeRoutingTable, remainingTimeMs, serverStats, 
requestStatistics);
     long executionEndTimeNs = System.nanoTime();
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.QUERY_EXECUTION,
         executionEndTimeNs - routingEndTimeNs);
@@ -269,6 +291,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     // Set total query processing time
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - 
compilationStartTimeNs);
     brokerResponse.setTimeUsedMs(totalTimeMs);
+    requestStatistics.setQueryProcessingTime(totalTimeMs);
+    requestStatistics.setStatistics(brokerResponse);
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
@@ -404,7 +428,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected abstract BrokerResponse processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
       @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, 
List<String>> offlineRoutingTable,
       @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, 
List<String>> realtimeRoutingTable,
-      long timeoutMs, ServerStats serverStats) throws Exception;
+      long timeoutMs, ServerStats serverStats, RequestStatistics 
requestStatistics) throws Exception;
 
   /**
    * Helper class to pass the per server statistics.
diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
index 4513741..6310a20 100644
--- 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.requesthandler;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.api.RequesterIdentity;
 import com.linkedin.pinot.common.response.BrokerResponse;
 import javax.annotation.Nullable;
@@ -29,5 +30,10 @@ public interface BrokerRequestHandler {
 
   void shutDown();
 
-  BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity 
requesterIdentity) throws Exception;
+  @Deprecated
+  BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity 
requesterIdentity)
+      throws Exception;
+
+  BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity 
requesterIdentity,
+      RequestStatistics requestStatistics) throws Exception;
 }
diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index b17eb42..ef46895 100644
--- 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.requesthandler;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.broker.AccessControlFactory;
 import com.linkedin.pinot.broker.broker.helix.LiveInstancesChangeListenerImpl;
 import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager;
@@ -131,7 +132,7 @@ public class ConnectionPoolBrokerRequestHandler extends 
BaseBrokerRequestHandler
   protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest 
originalBrokerRequest,
       @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, 
List<String>> offlineRoutingTable,
       @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, 
List<String>> realtimeRoutingTable,
-      long timeoutMs, ServerStats serverStats) throws Exception {
+      long timeoutMs, ServerStats serverStats, RequestStatistics 
requestStatistics) throws Exception {
     ScatterGatherStats scatterGatherStats = new ScatterGatherStats();
     PhaseTimes phaseTimes = new PhaseTimes();
 
@@ -179,6 +180,7 @@ public class ConnectionPoolBrokerRequestHandler extends 
BaseBrokerRequestHandler
     }
     long gatherEndTimeNs = System.nanoTime();
     phaseTimes.addToGatherTime(gatherEndTimeNs - gatherStartTimeNs);
+    // TODO Use scatterGatherStats as serverStats
     serverStats.setServerStats(scatterGatherStats.toString());
 
     //Step 3: deserialize the server responses
@@ -204,7 +206,9 @@ public class ConnectionPoolBrokerRequestHandler extends 
BaseBrokerRequestHandler
     // Step 4: reduce (merge) the server responses and create a broker 
response to be returned
     BrokerResponse brokerResponse =
         _brokerReduceService.reduceOnDataTable(originalBrokerRequest, 
dataTableMap, _brokerMetrics);
-    phaseTimes.addToReduceTime(System.nanoTime() - deserializationEndTimeNs);
+    final long reduceTimeNanos = System.nanoTime() - deserializationEndTimeNs;
+    phaseTimes.addToReduceTime(reduceTimeNanos);
+    requestStatistics.setReduceTimeNanos(reduceTimeNanos);
 
     // Set processing exceptions and number of servers queried/responded
     brokerResponse.setExceptions(processingExceptions);
diff --git 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 762b724..73c8040 100644
--- 
a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.requesthandler;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.broker.AccessControlFactory;
 import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager;
 import com.linkedin.pinot.broker.routing.RoutingTable;
@@ -69,7 +70,7 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
   protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest 
originalBrokerRequest,
       @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<String, 
List<String>> offlineRoutingTable,
       @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<String, 
List<String>> realtimeRoutingTable,
-      long timeoutMs, ServerStats serverStats) throws Exception {
+      long timeoutMs, ServerStats serverStats, RequestStatistics 
requestStatistics) throws Exception {
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
@@ -80,6 +81,7 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
     Map<Server, ServerResponse> response = asyncQueryResponse.getResponse();
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.SCATTER_GATHER,
         System.nanoTime() - scatterGatherStartTimeNs);
+    // TODO Use scatterGatherStats as serverStats
     serverStats.setServerStats(asyncQueryResponse.getStats());
 
     // TODO: do not convert Server to ServerInstance
@@ -104,7 +106,9 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
     long reduceStartTimeNs = System.nanoTime();
     BrokerResponseNative brokerResponse =
         _brokerReduceService.reduceOnDataTable(originalBrokerRequest, 
dataTableMap, _brokerMetrics);
-    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, 
System.nanoTime() - reduceStartTimeNs);
+    final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
+    requestStatistics.setReduceTimeNanos(reduceTimeNanos);
+    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, 
reduceTimeNanos);
 
     brokerResponse.setNumServersQueried(numServersQueried);
     brokerResponse.setNumServersResponded(numServersResponded);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to