This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 996f060615 Log query and emit metrics for grpc request and multi-stage
leaf stage (#11838)
996f060615 is described below
commit 996f0606151b2654a66f67f1661a417fc9153f97
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Oct 20 17:20:30 2023 -0700
Log query and emit metrics for grpc request and multi-stage leaf stage
(#11838)
---
.../jmx_prometheus_javaagent/configs/server.yml | 6 +
.../apache/pinot/common/metrics/ServerGauge.java | 1 -
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../pinot/core/query/logger/ServerQueryLogger.java | 227 +++++++++++++++++++++
.../pinot/core/query/scheduler/QueryScheduler.java | 161 +--------------
.../pinot/core/transport/grpc/GrpcQueryServer.java | 7 +
.../LeafStageTransferableBlockOperator.java | 8 +
.../org/apache/pinot/server/conf/ServerConf.java | 11 +
.../pinot/server/starter/ServerInstance.java | 5 +-
.../apache/pinot/spi/utils/CommonConstants.java | 13 ++
10 files changed, 287 insertions(+), 153 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
index 4da3548947..5e63275923 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml
@@ -112,6 +112,12 @@ rules:
cache: true
labels:
table: "$1"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\",
name=\"pinot.server.numResizes.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
+ name: "pinot_server_numResizes_$3"
+ cache: true
+ labels:
+ table: "$1"
+ tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\",
name=\"pinot.server.resizeTimeMs.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
name: "pinot_server_resizeTimeMs_$3"
cache: true
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 626c378146..45f34803a0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -40,7 +40,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
- RESIZE_TIME_MS("milliseconds", false),
// Upsert metrics
UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
// Dedup metrics
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e66199dcf5..f77194d37b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -81,6 +81,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// Emitted only by Server to Deep-store segment uploader.
SEGMENT_UPLOAD_TIMEOUT("segments", false),
NUM_RESIZES("numResizes", false),
+ RESIZE_TIME_MS("resizeTimeMs", false),
NO_TABLE_ACCESS("tables", true),
INDEXING_FAILURES("attributeValues", true),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
new file mode 100644
index 0000000000..2d72fc04ef
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.logger;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings("UnstableApiUsage")
+public class ServerQueryLogger {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerQueryLogger.class);
+ private static final AtomicReference<ServerQueryLogger> INSTANCE = new
AtomicReference<>();
+
+ private final ServerMetrics _serverMetrics;
+ private final RateLimiter _queryLogRateLimiter;
+ private final RateLimiter _droppedReportRateLimiter;
+ private final AtomicInteger _numDroppedLogs = new AtomicInteger();
+
+ public static void init(double queryLogMaxRate, double droppedReportMaxRate,
ServerMetrics serverMetrics) {
+ if (INSTANCE.compareAndSet(null, new ServerQueryLogger(queryLogMaxRate,
droppedReportMaxRate, serverMetrics))) {
+ LOGGER.info("Initialized ServerQueryLogger with query log max rate: {},
dropped report max rate: {}",
+ queryLogMaxRate, droppedReportMaxRate);
+ } else {
+ LOGGER.error("ServerQueryLogger is already initialized, not initializing
it again");
+ }
+ }
+
+ @Nullable
+ public static ServerQueryLogger getInstance() {
+ // NOTE: In some tests, ServerQueryLogger might not be initialized.
Returns null when it is not initialized.
+ return INSTANCE.get();
+ }
+
+ private ServerQueryLogger(double queryLogMaxRate, double
droppedReportMaxRate, ServerMetrics serverMetrics) {
+ _serverMetrics = serverMetrics;
+ _queryLogRateLimiter = RateLimiter.create(queryLogMaxRate);
+ _droppedReportRateLimiter = RateLimiter.create(droppedReportMaxRate);
+ }
+
+ public void logQuery(ServerQueryRequest request, InstanceResponseBlock
response, String schedulerType) {
+ String tableNameWithType = request.getTableNameWithType();
+ Map<String, String> responseMetadata = response.getResponseMetadata();
+
+ long numDocsScanned = getLongValue(responseMetadata,
MetadataKey.NUM_DOCS_SCANNED.getName(), -1);
+ addToTableMeter(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED,
numDocsScanned);
+
+ long numEntriesScannedInFilter =
+ getLongValue(responseMetadata,
MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), -1);
+ addToTableMeter(tableNameWithType,
ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
+
+ long numEntriesScannedPostFilter =
+ getLongValue(responseMetadata,
MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), -1);
+ addToTableMeter(tableNameWithType,
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+ int numSegmentsQueried = request.getSegmentsToQuery().size();
+ addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED,
numSegmentsQueried);
+
+ long numSegmentsProcessed = getLongValue(responseMetadata,
MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), -1);
+ addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED,
numSegmentsProcessed);
+
+ long numSegmentsMatched = getLongValue(responseMetadata,
MetadataKey.NUM_SEGMENTS_MATCHED.getName(), -1);
+ addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED,
numSegmentsMatched);
+
+ long numSegmentsPrunedInvalid =
+ getLongValue(responseMetadata,
MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), -1);
+ addToTableMeter(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_INVALID, numSegmentsPrunedInvalid);
+
+ long numSegmentsPrunedByLimit =
+ getLongValue(responseMetadata,
MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), -1);
+ addToTableMeter(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT, numSegmentsPrunedByLimit);
+
+ long numSegmentsPrunedByValue =
+ getLongValue(responseMetadata,
MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), -1);
+ addToTableMeter(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE, numSegmentsPrunedByValue);
+
+ long numConsumingSegmentsQueried =
+ getLongValue(responseMetadata,
MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), -1);
+ long numConsumingSegmentsProcessed =
+ getLongValue(responseMetadata,
MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), -1);
+ long numConsumingSegmentsMatched =
+ getLongValue(responseMetadata,
MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), -1);
+
+ long minConsumingFreshnessMs =
+ getLongValue(responseMetadata,
MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), -1);
+ if (minConsumingFreshnessMs > 0 && minConsumingFreshnessMs !=
Long.MAX_VALUE) {
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.FRESHNESS_LAG_MS,
+ (System.currentTimeMillis() - minConsumingFreshnessMs),
TimeUnit.MILLISECONDS);
+ }
+
+ long numResizes = getLongValue(responseMetadata,
MetadataKey.NUM_RESIZES.getName(), -1);
+ addToTableMeter(tableNameWithType, ServerMeter.NUM_RESIZES, numResizes);
+
+ long resizeTimeMs = getLongValue(responseMetadata,
MetadataKey.RESIZE_TIME_MS.getName(), -1);
+ addToTableMeter(tableNameWithType, ServerMeter.RESIZE_TIME_MS,
resizeTimeMs);
+
+ long threadCpuTimeNs = getLongValue(responseMetadata,
MetadataKey.THREAD_CPU_TIME_NS.getName(), 0);
+ if (threadCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.TOTAL_THREAD_CPU_TIME_MILLIS,
+ TimeUnit.MILLISECONDS.convert(threadCpuTimeNs,
TimeUnit.NANOSECONDS));
+ }
+
+ long systemActivitiesCpuTimeNs =
+ getLongValue(responseMetadata,
MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), 0);
+ if (systemActivitiesCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ }
+
+ long responseSerializationCpuTimeNs =
+ getLongValue(responseMetadata,
MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), 0);
+ if (responseSerializationCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.RESPONSE_SER_CPU_TIME_NS,
+ responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ }
+
+ long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs +
responseSerializationCpuTimeNs;
+ if (totalCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ }
+
+ TimerContext timerContext = request.getTimerContext();
+ long schedulerWaitMs =
timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+
+ // Please keep the format as name=value comma-separated with no spaces
+ // Please add new entries at the end
+ if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs,
numDocsScanned, numSegmentsPrunedInvalid)) {
+ LOGGER.info("Processed requestId={},table={},"
+ +
"segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
+ + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
+ +
"schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
+ +
"minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+ +
"threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}",
request.getRequestId(),
+ tableNameWithType,
+ numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
numConsumingSegmentsQueried,
+ numConsumingSegmentsProcessed, numConsumingSegmentsMatched,
numSegmentsPrunedInvalid,
+ numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
+
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
+ timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
+ timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME),
minConsumingFreshnessMs,
+ request.getBrokerId(), numDocsScanned, numEntriesScannedInFilter,
numEntriesScannedPostFilter, schedulerType,
+ totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs,
responseSerializationCpuTimeNs);
+
+ // Limit the dropping log message at most once per second.
+ if (_droppedReportRateLimiter.tryAcquire()) {
+ // NOTE: the reported number may not be accurate since we will be
missing some increments happened between
+ // get() and set().
+ int numDroppedLogs = _numDroppedLogs.get();
+ if (numDroppedLogs > 0) {
+ LOGGER.info("{} logs were dropped. (log max rate per second: {})",
numDroppedLogs,
+ _queryLogRateLimiter.getRate());
+ _numDroppedLogs.set(0);
+ }
+ }
+ } else {
+ _numDroppedLogs.getAndIncrement();
+ }
+ }
+
+ private static long getLongValue(Map<String, String> metadata, String key,
long defaultValue) {
+ String value = metadata.get(key);
+ return value != null ? Long.parseLong(value) : defaultValue;
+ }
+
+ private void addToTableMeter(String tableNameWithType, ServerMeter meter,
long value) {
+ if (value > 0) {
+ _serverMetrics.addMeteredTableValue(tableNameWithType, meter, value);
+ }
+ }
+
+ /**
+ * Returns {@code true} when the query should be logged even if the query
log rate is reached.
+ *
+ * TODO: come up with other criteria for forcing a log and come up with
better numbers.
+ */
+ private static boolean forceLog(long schedulerWaitMs, long numDocsScanned,
long numSegmentsPrunedInvalid) {
+ // If scheduler wait time is larger than 100ms, force the log.
+ if (schedulerWaitMs > 100) {
+ return true;
+ }
+
+ // If the number of document scanned is larger than 1 million rows, force
the log.
+ if (numDocsScanned > 1_000_000) {
+ return true;
+ }
+
+ // If there are invalid segments, force the log.
+ if (numSegmentsPrunedInvalid > 0) {
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 693e45c171..8f5bcb887f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,24 +22,20 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
-import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -53,24 +49,15 @@ import org.slf4j.LoggerFactory;
* Abstract class providing common scheduler functionality
* including query runner and query worker pool
*/
-@SuppressWarnings("UnstableApiUsage")
public abstract class QueryScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryScheduler.class);
- private static final String INVALID_NUM_SCANNED = "-1";
- private static final String INVALID_SEGMENTS_COUNT = "-1";
- private static final String INVALID_FRESHNESS_MS = "-1";
- private static final String INVALID_NUM_RESIZES = "-1";
- private static final String INVALID_RESIZE_TIME_MS = "-1";
- private static final String QUERY_LOG_MAX_RATE_KEY =
"query.log.maxRatePerSecond";
- private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
protected final ServerMetrics _serverMetrics;
protected final QueryExecutor _queryExecutor;
protected final ResourceManager _resourceManager;
protected final LongAccumulator _latestQueryTime;
- private final RateLimiter _queryLogRateLimiter;
- private final RateLimiter _numDroppedLogRateLimiter;
- private final AtomicInteger _numDroppedLogCounter;
+ protected final ServerQueryLogger _queryLogger =
ServerQueryLogger.getInstance();
+
protected volatile boolean _isRunning = false;
/**
@@ -91,10 +78,6 @@ public abstract class QueryScheduler {
_resourceManager = resourceManager;
_queryExecutor = queryExecutor;
_latestQueryTime = latestQueryTime;
- _queryLogRateLimiter =
RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY,
DEFAULT_QUERY_LOG_MAX_RATE));
- _numDroppedLogRateLimiter = RateLimiter.create(1.0d);
- _numDroppedLogCounter = new AtomicInteger(0);
- LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
}
/**
@@ -166,14 +149,17 @@ public abstract class QueryScheduler {
long requestId = queryRequest.getRequestId();
Map<String, String> responseMetadata =
instanceResponse.getResponseMetadata();
responseMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
-
byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
- Map<String, String> queryOptions =
queryRequest.getQueryContext().getQueryOptions();
- Long maxResponseSizeBytes =
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+ // Log the statistics
+ if (_queryLogger != null) {
+ _queryLogger.logQuery(queryRequest, instanceResponse, name());
+ }
// TODO: Perform this check sooner during the serialization of DataTable.
- if (maxResponseSizeBytes != null && responseBytes.length >
maxResponseSizeBytes) {
+ Map<String, String> queryOptions =
queryRequest.getQueryContext().getQueryOptions();
+ Long maxResponseSizeBytes =
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+ if (maxResponseSizeBytes != null && responseBytes != null &&
responseBytes.length > maxResponseSizeBytes) {
String errMsg =
String.format("Serialized query response size %d exceeds threshold
%d for requestId %d from broker %s",
responseBytes.length, maxResponseSizeBytes,
queryRequest.getRequestId(), queryRequest.getBrokerId());
@@ -187,133 +173,6 @@ public abstract class QueryScheduler {
responseBytes = serializeResponse(queryRequest, instanceResponse);
}
- // Log the statistics
- String tableNameWithType = queryRequest.getTableNameWithType();
- long numDocsScanned =
-
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
INVALID_NUM_SCANNED));
- long numEntriesScannedInFilter = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
INVALID_NUM_SCANNED));
- long numEntriesScannedPostFilter = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
INVALID_NUM_SCANNED));
- long numSegmentsProcessed = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
- long numSegmentsMatched = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
- long numSegmentsPrunedInvalid = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
INVALID_SEGMENTS_COUNT));
- long numSegmentsPrunedByLimit = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
INVALID_SEGMENTS_COUNT));
- long numSegmentsPrunedByValue = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
INVALID_SEGMENTS_COUNT));
- long numSegmentsConsuming = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
INVALID_SEGMENTS_COUNT));
- long numConsumingSegmentsProcessed = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
- INVALID_SEGMENTS_COUNT));
- long numConsumingSegmentsMatched = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
- long minConsumingFreshnessMs = Long.parseLong(
-
responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
INVALID_FRESHNESS_MS));
- int numResizes =
-
Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
INVALID_NUM_RESIZES));
- long resizeTimeMs =
-
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
INVALID_RESIZE_TIME_MS));
- long threadCpuTimeNs =
-
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
"0"));
- long systemActivitiesCpuTimeNs =
-
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
"0"));
- long responseSerializationCpuTimeNs =
-
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
"0"));
- long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs +
responseSerializationCpuTimeNs;
-
- if (numDocsScanned > 0) {
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
- }
- if (numEntriesScannedInFilter > 0) {
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER,
- numEntriesScannedInFilter);
- }
- if (numEntriesScannedPostFilter > 0) {
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
- numEntriesScannedPostFilter);
- }
- if (numResizes > 0) {
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_RESIZES, numResizes);
- }
- if (resizeTimeMs > 0) {
- _serverMetrics.addValueToTableGauge(tableNameWithType,
ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
- }
- if (threadCpuTimeNs > 0) {
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.TOTAL_THREAD_CPU_TIME_MILLIS,
- TimeUnit.MILLISECONDS.convert(threadCpuTimeNs,
TimeUnit.NANOSECONDS));
- }
- if (systemActivitiesCpuTimeNs > 0) {
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
- systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- }
- if (responseSerializationCpuTimeNs > 0) {
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.RESPONSE_SER_CPU_TIME_NS,
- responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- }
- if (totalCpuTimeNs > 0) {
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
- TimeUnit.NANOSECONDS);
- }
-
- TimerContext timerContext = queryRequest.getTimerContext();
- int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
- long schedulerWaitMs =
timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
-
- // Please keep the format as name=value comma-separated with no spaces
- // Please add new entries at the end
- if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs,
numDocsScanned, numSegmentsPrunedInvalid)) {
- LOGGER.info("Processed requestId={},table={},"
- +
"segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
- + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
- +
"schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
- +
"minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
- +
"threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId,
tableNameWithType,
- numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
numSegmentsConsuming,
- numConsumingSegmentsProcessed, numConsumingSegmentsMatched,
numSegmentsPrunedInvalid,
- numSegmentsPrunedByLimit, numSegmentsPrunedByValue,
schedulerWaitMs,
-
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
- timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
-
timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME),
minConsumingFreshnessMs,
- queryRequest.getBrokerId(), numDocsScanned,
numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
- totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs,
responseSerializationCpuTimeNs);
-
- // Limit the dropping log message at most once per second.
- if (_numDroppedLogRateLimiter.tryAcquire()) {
- // NOTE: the reported number may not be accurate since we will be
missing some increments happened between
- // get() and set().
- int numDroppedLog = _numDroppedLogCounter.get();
- if (numDroppedLog > 0) {
- LOGGER.info("{} logs were dropped. (log max rate per second: {})",
numDroppedLog,
- _queryLogRateLimiter.getRate());
- _numDroppedLogCounter.set(0);
- }
- }
- } else {
- _numDroppedLogCounter.incrementAndGet();
- }
-
- if (minConsumingFreshnessMs > -1 && minConsumingFreshnessMs !=
Long.MAX_VALUE) {
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.FRESHNESS_LAG_MS,
- (System.currentTimeMillis() - minConsumingFreshnessMs),
TimeUnit.MILLISECONDS);
- }
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
- numSegmentsPrunedInvalid);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
- numSegmentsPrunedByLimit);
- _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
- numSegmentsPrunedByValue);
-
return responseBytes;
} finally {
Tracing.ThreadAccountantOps.clear();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index aae8da0fc6..b238d7de0c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -43,6 +43,7 @@ import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.server.access.AccessControl;
@@ -61,6 +62,7 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
private final ExecutorService _executorService =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
private final AccessControl _accessControl;
+ private final ServerQueryLogger _queryLogger =
ServerQueryLogger.getInstance();
public GrpcQueryServer(int port, GrpcConfig config, TlsConfig tlsConfig,
QueryExecutor queryExecutor,
ServerMetrics serverMetrics, AccessControl accessControl) {
@@ -174,5 +176,10 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
responseObserver.onNext(serverResponse);
_serverMetrics.addMeteredGlobalValue(ServerMeter.GRPC_BYTES_SENT,
serverResponse.getSerializedSize());
responseObserver.onCompleted();
+
+ // Log the query
+ if (_queryLogger != null) {
+ _queryLogger.logQuery(queryRequest, instanceResponse, "GrpcQueryServer");
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a6eae5b795..1a109358ae 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -46,6 +46,7 @@ import
org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -149,12 +150,16 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
private Future<Void> startExecution() {
ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
+ ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
return _executorService.submit(() -> {
try {
if (_requests.size() == 1) {
ServerQueryRequest request = _requests.get(0);
InstanceResponseBlock instanceResponseBlock =
_queryExecutor.execute(request, _executorService,
resultsBlockConsumer);
+ if (queryLogger != null) {
+ queryLogger.logQuery(request, instanceResponseBlock,
"MultistageEngine");
+ }
// TODO: Revisit if we should treat all exceptions as query failure.
Currently MERGE_RESPONSE_ERROR and
// SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
Map<Integer, String> exceptions =
instanceResponseBlock.getExceptions();
@@ -180,6 +185,9 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
try {
InstanceResponseBlock instanceResponseBlock =
_queryExecutor.execute(request, _executorService,
resultsBlockConsumer);
+ if (queryLogger != null) {
+ queryLogger.logQuery(request, instanceResponseBlock,
"MultistageEngine");
+ }
Map<Integer, String> exceptions =
instanceResponseBlock.getExceptions();
if (!exceptions.isEmpty()) {
// Drain the latch when receiving exception block and not
wait for the other thread to finish
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index 3ed6fd6f54..1774594b71 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -103,6 +103,17 @@ public class ServerConf {
return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS,
DEFAULT_DATA_MANAGER_CLASS);
}
+ public double getQueryLogMaxRate() {
+ Double queryLogMaxRate =
_serverConf.getProperty(CONFIG_OF_QUERY_LOG_MAX_RATE, Double.class);
+ return queryLogMaxRate != null ? queryLogMaxRate
+ : _serverConf.getProperty(DEPRECATED_CONFIG_OF_QUERY_LOG_MAX_RATE,
DEFAULT_QUERY_LOG_MAX_RATE);
+ }
+
+ public double getQueryLogDroppedReportMaxRate() {
+ return _serverConf.getProperty(CONFIG_OF_QUERY_LOG_DROPPED_REPORT_MAX_RATE,
+ DEFAULT_QUERY_LOG_DROPPED_REPORT_MAX_RATE);
+ }
+
public String getQueryExecutorClassName() {
return _serverConf.getProperty(CONFIG_OF_QUERY_EXECUTOR_CLASS,
DEFAULT_QUERY_EXECUTOR_CLASS);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ece3d87428..d596566a55 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
import org.apache.pinot.core.transport.ChannelHandlerFactory;
@@ -99,7 +100,9 @@ public class ServerInstance {
_instanceDataManager =
PluginManager.get().createInstance(instanceDataManagerClassName);
_instanceDataManager.init(serverConf.getInstanceDataManagerConfig(),
helixManager, _serverMetrics);
- // Initialize FunctionRegistry before starting the query executor
+ // Initialize ServerQueryLogger and FunctionRegistry before starting the
query executor
+ ServerQueryLogger.init(serverConf.getQueryLogMaxRate(),
serverConf.getQueryLogDroppedReportMaxRate(),
+ _serverMetrics);
FunctionRegistry.init();
String queryExecutorClassName = serverConf.getQueryExecutorClassName();
LOGGER.info("Initializing query executor of class: {}",
queryExecutorClassName);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b5064fad3c..6c8c117e82 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -521,6 +521,18 @@ public class CommonConstants {
public static final String CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT =
"pinot.server.instance.reload.consumingSegment";
public static final String CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS =
"pinot.server.instance.data.manager.class";
+
+ // Query logger related configs
+ public static final String CONFIG_OF_QUERY_LOG_MAX_RATE =
"pinot.server.query.log.maxRatePerSecond";
+ @Deprecated
+ public static final String DEPRECATED_CONFIG_OF_QUERY_LOG_MAX_RATE =
+ "pinot.query.scheduler.query.log.maxRatePerSecond";
+ public static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000;
+ public static final String CONFIG_OF_QUERY_LOG_DROPPED_REPORT_MAX_RATE =
+ "pinot.server.query.log.droppedReportMaxRatePerSecond";
+ public static final double DEFAULT_QUERY_LOG_DROPPED_REPORT_MAX_RATE = 1;
+
+ // Query executor related configs
public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS =
"pinot.server.query.executor.class";
public static final String CONFIG_OF_QUERY_EXECUTOR_PRUNER_CLASS =
"pinot.server.query.executor.pruner.class";
public static final String CONFIG_OF_QUERY_EXECUTOR_PLAN_MAKER_CLASS =
@@ -530,6 +542,7 @@ public class CommonConstants {
"pinot.server.query.executor.num.groups.limit";
public static final String
CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.query.executor.max.init.group.holder.capacity";
+
public static final String CONFIG_OF_TRANSFORM_FUNCTIONS =
"pinot.server.transforms";
public static final String CONFIG_OF_SERVER_QUERY_REWRITER_CLASS_NAMES =
"pinot.server.query.rewriter.class.names";
public static final String CONFIG_OF_ENABLE_QUERY_CANCELLATION =
"pinot.server.enable.query.cancellation";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]