This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 996f060615 Log query and emit metrics for grpc request and multi-stage 
leaf stage (#11838)
996f060615 is described below

commit 996f0606151b2654a66f67f1661a417fc9153f97
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Oct 20 17:20:30 2023 -0700

    Log query and emit metrics for grpc request and multi-stage leaf stage 
(#11838)
---
 .../jmx_prometheus_javaagent/configs/server.yml    |   6 +
 .../apache/pinot/common/metrics/ServerGauge.java   |   1 -
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../pinot/core/query/logger/ServerQueryLogger.java | 227 +++++++++++++++++++++
 .../pinot/core/query/scheduler/QueryScheduler.java | 161 +--------------
 .../pinot/core/transport/grpc/GrpcQueryServer.java |   7 +
 .../LeafStageTransferableBlockOperator.java        |   8 +
 .../org/apache/pinot/server/conf/ServerConf.java   |  11 +
 .../pinot/server/starter/ServerInstance.java       |   5 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  13 ++
 10 files changed, 287 insertions(+), 153 deletions(-)

diff --git 
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
index 4da3548947..5e63275923 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
@@ -112,6 +112,12 @@ rules:
   cache: true
   labels:
     table: "$1"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", 
name=\"pinot.server.numResizes.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
+  name: "pinot_server_numResizes_$3"
+  cache: true
+  labels:
+    table: "$1"
+    tableType: "$2"
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", 
name=\"pinot.server.resizeTimeMs.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
   name: "pinot_server_resizeTimeMs_$3"
   cache: true
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 626c378146..45f34803a0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -40,7 +40,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
   REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
   LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
-  RESIZE_TIME_MS("milliseconds", false),
   // Upsert metrics
   UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
   // Dedup metrics
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e66199dcf5..f77194d37b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -81,6 +81,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   // Emitted only by Server to Deep-store segment uploader.
   SEGMENT_UPLOAD_TIMEOUT("segments", false),
   NUM_RESIZES("numResizes", false),
+  RESIZE_TIME_MS("resizeTimeMs", false),
   NO_TABLE_ACCESS("tables", true),
   INDEXING_FAILURES("attributeValues", true),
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
new file mode 100644
index 0000000000..2d72fc04ef
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.logger;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings("UnstableApiUsage")
+public class ServerQueryLogger {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerQueryLogger.class);
+  private static final AtomicReference<ServerQueryLogger> INSTANCE = new 
AtomicReference<>();
+
+  private final ServerMetrics _serverMetrics;
+  private final RateLimiter _queryLogRateLimiter;
+  private final RateLimiter _droppedReportRateLimiter;
+  private final AtomicInteger _numDroppedLogs = new AtomicInteger();
+
+  public static void init(double queryLogMaxRate, double droppedReportMaxRate, 
ServerMetrics serverMetrics) {
+    if (INSTANCE.compareAndSet(null, new ServerQueryLogger(queryLogMaxRate, 
droppedReportMaxRate, serverMetrics))) {
+      LOGGER.info("Initialized ServerQueryLogger with query log max rate: {}, 
dropped report max rate: {}",
+          queryLogMaxRate, droppedReportMaxRate);
+    } else {
+      LOGGER.error("ServerQueryLogger is already initialized, not initializing 
it again");
+    }
+  }
+
+  @Nullable
+  public static ServerQueryLogger getInstance() {
+    // NOTE: In some tests, ServerQueryLogger might not be initialized. 
Returns null when it is not initialized.
+    return INSTANCE.get();
+  }
+
+  private ServerQueryLogger(double queryLogMaxRate, double 
droppedReportMaxRate, ServerMetrics serverMetrics) {
+    _serverMetrics = serverMetrics;
+    _queryLogRateLimiter = RateLimiter.create(queryLogMaxRate);
+    _droppedReportRateLimiter = RateLimiter.create(droppedReportMaxRate);
+  }
+
+  public void logQuery(ServerQueryRequest request, InstanceResponseBlock 
response, String schedulerType) {
+    String tableNameWithType = request.getTableNameWithType();
+    Map<String, String> responseMetadata = response.getResponseMetadata();
+
+    long numDocsScanned = getLongValue(responseMetadata, 
MetadataKey.NUM_DOCS_SCANNED.getName(), -1);
+    addToTableMeter(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, 
numDocsScanned);
+
+    long numEntriesScannedInFilter =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), -1);
+    addToTableMeter(tableNameWithType, 
ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
+
+    long numEntriesScannedPostFilter =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), -1);
+    addToTableMeter(tableNameWithType, 
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+    int numSegmentsQueried = request.getSegmentsToQuery().size();
+    addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, 
numSegmentsQueried);
+
+    long numSegmentsProcessed = getLongValue(responseMetadata, 
MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), -1);
+    addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, 
numSegmentsProcessed);
+
+    long numSegmentsMatched = getLongValue(responseMetadata, 
MetadataKey.NUM_SEGMENTS_MATCHED.getName(), -1);
+    addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, 
numSegmentsMatched);
+
+    long numSegmentsPrunedInvalid =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), -1);
+    addToTableMeter(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_INVALID, numSegmentsPrunedInvalid);
+
+    long numSegmentsPrunedByLimit =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), -1);
+    addToTableMeter(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT, numSegmentsPrunedByLimit);
+
+    long numSegmentsPrunedByValue =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), -1);
+    addToTableMeter(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE, numSegmentsPrunedByValue);
+
+    long numConsumingSegmentsQueried =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), -1);
+    long numConsumingSegmentsProcessed =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), -1);
+    long numConsumingSegmentsMatched =
+        getLongValue(responseMetadata, 
MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), -1);
+
+    long minConsumingFreshnessMs =
+        getLongValue(responseMetadata, 
MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), -1);
+    if (minConsumingFreshnessMs > 0 && minConsumingFreshnessMs != 
Long.MAX_VALUE) {
+      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.FRESHNESS_LAG_MS,
+          (System.currentTimeMillis() - minConsumingFreshnessMs), 
TimeUnit.MILLISECONDS);
+    }
+
+    long numResizes = getLongValue(responseMetadata, 
MetadataKey.NUM_RESIZES.getName(), -1);
+    addToTableMeter(tableNameWithType, ServerMeter.NUM_RESIZES, numResizes);
+
+    long resizeTimeMs = getLongValue(responseMetadata, 
MetadataKey.RESIZE_TIME_MS.getName(), -1);
+    addToTableMeter(tableNameWithType, ServerMeter.RESIZE_TIME_MS, 
resizeTimeMs);
+
+    long threadCpuTimeNs = getLongValue(responseMetadata, 
MetadataKey.THREAD_CPU_TIME_NS.getName(), 0);
+    if (threadCpuTimeNs > 0) {
+      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.TOTAL_THREAD_CPU_TIME_MILLIS,
+          TimeUnit.MILLISECONDS.convert(threadCpuTimeNs, 
TimeUnit.NANOSECONDS));
+    }
+
+    long systemActivitiesCpuTimeNs =
+        getLongValue(responseMetadata, 
MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), 0);
+    if (systemActivitiesCpuTimeNs > 0) {
+      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+          systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+    }
+
+    long responseSerializationCpuTimeNs =
+        getLongValue(responseMetadata, 
MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), 0);
+    if (responseSerializationCpuTimeNs > 0) {
+      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.RESPONSE_SER_CPU_TIME_NS,
+          responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+    }
+
+    long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + 
responseSerializationCpuTimeNs;
+    if (totalCpuTimeNs > 0) {
+      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
+          TimeUnit.NANOSECONDS);
+    }
+
+    TimerContext timerContext = request.getTimerContext();
+    long schedulerWaitMs = 
timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+
+    // Please keep the format as name=value comma-separated with no spaces
+    // Please add new entries at the end
+    if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, 
numDocsScanned, numSegmentsPrunedInvalid)) {
+      LOGGER.info("Processed requestId={},table={},"
+              + 
"segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
+              + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
+              + 
"schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
+              + 
"minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+              + 
"threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", 
request.getRequestId(),
+          tableNameWithType,
+          numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, 
numConsumingSegmentsQueried,
+          numConsumingSegmentsProcessed, numConsumingSegmentsMatched, 
numSegmentsPrunedInvalid,
+          numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
+          
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+          
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
minConsumingFreshnessMs,
+          request.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, schedulerType,
+          totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, 
responseSerializationCpuTimeNs);
+
+      // Limit the dropping log message at most once per second.
+      if (_droppedReportRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be 
missing some increments happened between
+        // get() and set().
+        int numDroppedLogs = _numDroppedLogs.get();
+        if (numDroppedLogs > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", 
numDroppedLogs,
+              _queryLogRateLimiter.getRate());
+          _numDroppedLogs.set(0);
+        }
+      }
+    } else {
+      _numDroppedLogs.getAndIncrement();
+    }
+  }
+
+  private static long getLongValue(Map<String, String> metadata, String key, 
long defaultValue) {
+    String value = metadata.get(key);
+    return value != null ? Long.parseLong(value) : defaultValue;
+  }
+
+  private void addToTableMeter(String tableNameWithType, ServerMeter meter, 
long value) {
+    if (value > 0) {
+      _serverMetrics.addMeteredTableValue(tableNameWithType, meter, value);
+    }
+  }
+
+  /**
+   * Returns {@code true} when the query should be logged even if the query 
log rate is reached.
+   *
+   * TODO: come up with other criteria for forcing a log and come up with 
better numbers.
+   */
+  private static boolean forceLog(long schedulerWaitMs, long numDocsScanned, 
long numSegmentsPrunedInvalid) {
+    // If scheduler wait time is larger than 100ms, force the log.
+    if (schedulerWaitMs > 100) {
+      return true;
+    }
+
+    // If the number of document scanned is larger than 1 million rows, force 
the log.
+    if (numDocsScanned > 1_000_000) {
+      return true;
+    }
+
+    // If there are invalid segments, force the log.
+    if (numSegmentsPrunedInvalid > 0) {
+      return true;
+    }
+
+    return false;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 693e45c171..8f5bcb887f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,24 +22,20 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
-import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -53,24 +49,15 @@ import org.slf4j.LoggerFactory;
  * Abstract class providing common scheduler functionality
  * including query runner and query worker pool
  */
-@SuppressWarnings("UnstableApiUsage")
 public abstract class QueryScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryScheduler.class);
 
-  private static final String INVALID_NUM_SCANNED = "-1";
-  private static final String INVALID_SEGMENTS_COUNT = "-1";
-  private static final String INVALID_FRESHNESS_MS = "-1";
-  private static final String INVALID_NUM_RESIZES = "-1";
-  private static final String INVALID_RESIZE_TIME_MS = "-1";
-  private static final String QUERY_LOG_MAX_RATE_KEY = 
"query.log.maxRatePerSecond";
-  private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
   protected final ServerMetrics _serverMetrics;
   protected final QueryExecutor _queryExecutor;
   protected final ResourceManager _resourceManager;
   protected final LongAccumulator _latestQueryTime;
-  private final RateLimiter _queryLogRateLimiter;
-  private final RateLimiter _numDroppedLogRateLimiter;
-  private final AtomicInteger _numDroppedLogCounter;
+  protected final ServerQueryLogger _queryLogger = 
ServerQueryLogger.getInstance();
+
   protected volatile boolean _isRunning = false;
 
   /**
@@ -91,10 +78,6 @@ public abstract class QueryScheduler {
     _resourceManager = resourceManager;
     _queryExecutor = queryExecutor;
     _latestQueryTime = latestQueryTime;
-    _queryLogRateLimiter = 
RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY, 
DEFAULT_QUERY_LOG_MAX_RATE));
-    _numDroppedLogRateLimiter = RateLimiter.create(1.0d);
-    _numDroppedLogCounter = new AtomicInteger(0);
-    LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
   }
 
   /**
@@ -166,14 +149,17 @@ public abstract class QueryScheduler {
       long requestId = queryRequest.getRequestId();
       Map<String, String> responseMetadata = 
instanceResponse.getResponseMetadata();
       responseMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
-
       byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
 
-      Map<String, String> queryOptions = 
queryRequest.getQueryContext().getQueryOptions();
-      Long maxResponseSizeBytes = 
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+      // Log the statistics
+      if (_queryLogger != null) {
+        _queryLogger.logQuery(queryRequest, instanceResponse, name());
+      }
 
       // TODO: Perform this check sooner during the serialization of DataTable.
-      if (maxResponseSizeBytes != null && responseBytes.length > 
maxResponseSizeBytes) {
+      Map<String, String> queryOptions = 
queryRequest.getQueryContext().getQueryOptions();
+      Long maxResponseSizeBytes = 
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+      if (maxResponseSizeBytes != null && responseBytes != null && 
responseBytes.length > maxResponseSizeBytes) {
         String errMsg =
             String.format("Serialized query response size %d exceeds threshold 
%d for requestId %d from broker %s",
                 responseBytes.length, maxResponseSizeBytes, 
queryRequest.getRequestId(), queryRequest.getBrokerId());
@@ -187,133 +173,6 @@ public abstract class QueryScheduler {
         responseBytes = serializeResponse(queryRequest, instanceResponse);
       }
 
-      // Log the statistics
-      String tableNameWithType = queryRequest.getTableNameWithType();
-      long numDocsScanned =
-          
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
 INVALID_NUM_SCANNED));
-      long numEntriesScannedInFilter = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
 INVALID_NUM_SCANNED));
-      long numEntriesScannedPostFilter = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
 INVALID_NUM_SCANNED));
-      long numSegmentsProcessed = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), 
INVALID_SEGMENTS_COUNT));
-      long numSegmentsMatched = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), 
INVALID_SEGMENTS_COUNT));
-      long numSegmentsPrunedInvalid = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
 INVALID_SEGMENTS_COUNT));
-      long numSegmentsPrunedByLimit = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
 INVALID_SEGMENTS_COUNT));
-      long numSegmentsPrunedByValue = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
 INVALID_SEGMENTS_COUNT));
-      long numSegmentsConsuming = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
 INVALID_SEGMENTS_COUNT));
-      long numConsumingSegmentsProcessed = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
-              INVALID_SEGMENTS_COUNT));
-      long numConsumingSegmentsMatched = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
 INVALID_SEGMENTS_COUNT));
-      long minConsumingFreshnessMs = Long.parseLong(
-          
responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
 INVALID_FRESHNESS_MS));
-      int numResizes =
-          
Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
 INVALID_NUM_RESIZES));
-      long resizeTimeMs =
-          
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
 INVALID_RESIZE_TIME_MS));
-      long threadCpuTimeNs =
-          
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
 "0"));
-      long systemActivitiesCpuTimeNs =
-          
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
 "0"));
-      long responseSerializationCpuTimeNs =
-          
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
 "0"));
-      long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + 
responseSerializationCpuTimeNs;
-
-      if (numDocsScanned > 0) {
-        _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
-      }
-      if (numEntriesScannedInFilter > 0) {
-        _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER,
-            numEntriesScannedInFilter);
-      }
-      if (numEntriesScannedPostFilter > 0) {
-        _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
-            numEntriesScannedPostFilter);
-      }
-      if (numResizes > 0) {
-        _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_RESIZES, numResizes);
-      }
-      if (resizeTimeMs > 0) {
-        _serverMetrics.addValueToTableGauge(tableNameWithType, 
ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
-      }
-      if (threadCpuTimeNs > 0) {
-        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-        _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.TOTAL_THREAD_CPU_TIME_MILLIS,
-            TimeUnit.MILLISECONDS.convert(threadCpuTimeNs, 
TimeUnit.NANOSECONDS));
-      }
-      if (systemActivitiesCpuTimeNs > 0) {
-        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
-            systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-      }
-      if (responseSerializationCpuTimeNs > 0) {
-        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.RESPONSE_SER_CPU_TIME_NS,
-            responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-      }
-      if (totalCpuTimeNs > 0) {
-        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
-            TimeUnit.NANOSECONDS);
-      }
-
-      TimerContext timerContext = queryRequest.getTimerContext();
-      int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
-      long schedulerWaitMs = 
timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
-
-      // Please keep the format as name=value comma-separated with no spaces
-      // Please add new entries at the end
-      if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, 
numDocsScanned, numSegmentsPrunedInvalid)) {
-        LOGGER.info("Processed requestId={},table={},"
-                + 
"segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
-                + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
-                + 
"schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
-                + 
"minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
-                + 
"threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, 
tableNameWithType,
-            numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, 
numSegmentsConsuming,
-            numConsumingSegmentsProcessed, numConsumingSegmentsMatched, 
numSegmentsPrunedInvalid,
-            numSegmentsPrunedByLimit, numSegmentsPrunedByValue, 
schedulerWaitMs,
-            
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
-            timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-            
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
-            
timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
minConsumingFreshnessMs,
-            queryRequest.getBrokerId(), numDocsScanned, 
numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
-            totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, 
responseSerializationCpuTimeNs);
-
-        // Limit the dropping log message at most once per second.
-        if (_numDroppedLogRateLimiter.tryAcquire()) {
-          // NOTE: the reported number may not be accurate since we will be 
missing some increments happened between
-          // get() and set().
-          int numDroppedLog = _numDroppedLogCounter.get();
-          if (numDroppedLog > 0) {
-            LOGGER.info("{} logs were dropped. (log max rate per second: {})", 
numDroppedLog,
-                _queryLogRateLimiter.getRate());
-            _numDroppedLogCounter.set(0);
-          }
-        }
-      } else {
-        _numDroppedLogCounter.incrementAndGet();
-      }
-
-      if (minConsumingFreshnessMs > -1 && minConsumingFreshnessMs != 
Long.MAX_VALUE) {
-        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.FRESHNESS_LAG_MS,
-            (System.currentTimeMillis() - minConsumingFreshnessMs), 
TimeUnit.MILLISECONDS);
-      }
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
-          numSegmentsPrunedInvalid);
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
-          numSegmentsPrunedByLimit);
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
-          numSegmentsPrunedByValue);
-
       return responseBytes;
     } finally {
       Tracing.ThreadAccountantOps.clear();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index aae8da0fc6..b238d7de0c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -43,6 +43,7 @@ import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
 import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.server.access.AccessControl;
@@ -61,6 +62,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
   private final ExecutorService _executorService =
       
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   private final AccessControl _accessControl;
+  private final ServerQueryLogger _queryLogger = 
ServerQueryLogger.getInstance();
 
   public GrpcQueryServer(int port, GrpcConfig config, TlsConfig tlsConfig, 
QueryExecutor queryExecutor,
       ServerMetrics serverMetrics, AccessControl accessControl) {
@@ -174,5 +176,10 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     responseObserver.onNext(serverResponse);
     _serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_BYTES_SENT, 
serverResponse.getSerializedSize());
     responseObserver.onCompleted();
+
+    // Log the query
+    if (_queryLogger != null) {
+      _queryLogger.logQuery(queryRequest, instanceResponse, "GrpcQueryServer");
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a6eae5b795..1a109358ae 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -149,12 +150,16 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
 
   private Future<Void> startExecution() {
     ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
+    ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
     return _executorService.submit(() -> {
       try {
         if (_requests.size() == 1) {
           ServerQueryRequest request = _requests.get(0);
           InstanceResponseBlock instanceResponseBlock =
               _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
+          if (queryLogger != null) {
+            queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
+          }
           // TODO: Revisit if we should treat all exceptions as query failure. 
Currently MERGE_RESPONSE_ERROR and
           //       SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
           Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
@@ -180,6 +185,9 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
               try {
                 InstanceResponseBlock instanceResponseBlock =
                     _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
+                if (queryLogger != null) {
+                  queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
+                }
                 Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
                 if (!exceptions.isEmpty()) {
                   // Drain the latch when receiving exception block and not 
wait for the other thread to finish
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java 
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index 3ed6fd6f54..1774594b71 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -103,6 +103,17 @@ public class ServerConf {
     return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS, 
DEFAULT_DATA_MANAGER_CLASS);
   }
 
+  public double getQueryLogMaxRate() {
+    Double queryLogMaxRate = 
_serverConf.getProperty(CONFIG_OF_QUERY_LOG_MAX_RATE, Double.class);
+    return queryLogMaxRate != null ? queryLogMaxRate
+        : _serverConf.getProperty(DEPRECATED_CONFIG_OF_QUERY_LOG_MAX_RATE, 
DEFAULT_QUERY_LOG_MAX_RATE);
+  }
+
+  public double getQueryLogDroppedReportMaxRate() {
+    return _serverConf.getProperty(CONFIG_OF_QUERY_LOG_DROPPED_REPORT_MAX_RATE,
+        DEFAULT_QUERY_LOG_DROPPED_REPORT_MAX_RATE);
+  }
+
   public String getQueryExecutorClassName() {
     return _serverConf.getProperty(CONFIG_OF_QUERY_EXECUTOR_CLASS, 
DEFAULT_QUERY_EXECUTOR_CLASS);
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ece3d87428..d596566a55 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import 
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
 import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
 import org.apache.pinot.core.transport.ChannelHandlerFactory;
@@ -99,7 +100,9 @@ public class ServerInstance {
     _instanceDataManager = 
PluginManager.get().createInstance(instanceDataManagerClassName);
     _instanceDataManager.init(serverConf.getInstanceDataManagerConfig(), 
helixManager, _serverMetrics);
 
-    // Initialize FunctionRegistry before starting the query executor
+    // Initialize ServerQueryLogger and FunctionRegistry before starting the 
query executor
+    ServerQueryLogger.init(serverConf.getQueryLogMaxRate(), 
serverConf.getQueryLogDroppedReportMaxRate(),
+        _serverMetrics);
     FunctionRegistry.init();
     String queryExecutorClassName = serverConf.getQueryExecutorClassName();
     LOGGER.info("Initializing query executor of class: {}", 
queryExecutorClassName);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b5064fad3c..6c8c117e82 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -521,6 +521,18 @@ public class CommonConstants {
     public static final String CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT =
         "pinot.server.instance.reload.consumingSegment";
     public static final String CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS = 
"pinot.server.instance.data.manager.class";
+
+    // Query logger related configs
+    public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = 
"pinot.server.query.log.maxRatePerSecond";
+    @Deprecated
+    public static final String DEPRECATED_CONFIG_OF_QUERY_LOG_MAX_RATE =
+        "pinot.query.scheduler.query.log.maxRatePerSecond";
+    public static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000;
+    public static final String CONFIG_OF_QUERY_LOG_DROPPED_REPORT_MAX_RATE =
+        "pinot.server.query.log.droppedReportMaxRatePerSecond";
+    public static final double DEFAULT_QUERY_LOG_DROPPED_REPORT_MAX_RATE = 1;
+
+    // Query executor related configs
     public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS = 
"pinot.server.query.executor.class";
     public static final String CONFIG_OF_QUERY_EXECUTOR_PRUNER_CLASS = 
"pinot.server.query.executor.pruner.class";
     public static final String CONFIG_OF_QUERY_EXECUTOR_PLAN_MAKER_CLASS =
@@ -530,6 +542,7 @@ public class CommonConstants {
         "pinot.server.query.executor.num.groups.limit";
     public static final String 
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
         "pinot.server.query.executor.max.init.group.holder.capacity";
+
     public static final String CONFIG_OF_TRANSFORM_FUNCTIONS = 
"pinot.server.transforms";
     public static final String CONFIG_OF_SERVER_QUERY_REWRITER_CLASS_NAMES = 
"pinot.server.query.rewriter.class.names";
     public static final String CONFIG_OF_ENABLE_QUERY_CANCELLATION = 
"pinot.server.enable.query.cancellation";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to