This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bbeb1dac1a Broker Query Timeout Metric (#11892)
bbeb1dac1a is described below
commit bbeb1dac1ab8b5e1aa91621a3c38a04a234cb830
Author: Prashant Pandey <[email protected]>
AuthorDate: Fri Nov 3 07:26:14 2023 +0100
Broker Query Timeout Metric (#11892)
---
.../etc/jmx_prometheus_javaagent/configs/broker.yml | 5 +++++
.../MultiStageBrokerRequestHandler.java | 20 +++++++++++++-------
.../SingleConnectionBrokerRequestHandler.java | 4 ++++
.../org/apache/pinot/common/metrics/BrokerMeter.java | 3 +++
4 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
index 543dabcd15..04824fbb5c 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/broker.yml
@@ -103,6 +103,11 @@ rules:
cache: true
labels:
table: "$1"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\",
name=\"pinot.broker.([^\\.]*?)\\.brokerResponsesWithTimeouts\"><>(\\w+)"
+ name: "pinot_broker_brokerResponsesWithTimeouts_$2"
+ cache: true
+ labels:
+ table: "$1"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\",
name=\"pinot.broker.([^\\.]*?)\\.noServerFoundExceptions\"><>(\\w+)"
name: "pinot_broker_noServerFoundExceptions_$2"
cache: true
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ccb0e405ab..f3cbaffb1b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
@@ -81,13 +82,11 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;
-
- public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId,
- BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
- QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics,
- BrokerQueryEventListener brokerQueryEventListener) {
- super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache,
- brokerMetrics, brokerQueryEventListener);
+ public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId, BrokerRoutingManager routingManager,
+ AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
+ BrokerMetrics brokerMetrics, BrokerQueryEventListener
brokerQueryEventListener) {
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics,
+ brokerQueryEventListener);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
@@ -190,6 +189,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
+ } catch (TimeoutException e) {
+ for (String table : tableNames) {
+ _brokerMetrics.addMeteredTableValue(table,
BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
+ }
+ LOGGER.warn("Timed out executing request {}: {}", requestId, query);
+ requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
+ return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
String consolidatedMessage =
ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId,
query, consolidatedMessage);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 0e0c35d318..d44532a500 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -44,6 +44,7 @@ import
org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.transport.AsyncQueryResponse;
+import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
@@ -116,6 +117,9 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
realtimeBrokerRequest, realtimeRoutingTable, timeoutMs);
_failureDetector.notifyQuerySubmitted(asyncQueryResponse);
Map<ServerRoutingInstance, ServerResponse> finalResponses =
asyncQueryResponse.getFinalResponses();
+ if (asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT) {
+ _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
+ }
_failureDetector.notifyQueryFinished(asyncQueryResponse);
_brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.SCATTER_GATHER,
System.nanoTime() - scatterGatherStartTimeNs);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index bde0f3c409..bb76591ab0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -67,6 +67,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// This metric track the number of broker responses with not all servers
responded.
// (numServersQueried > numServersResponded)
BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED("badResponses", false),
+
+ BROKER_RESPONSES_WITH_TIMEOUTS("badResponses", false),
+
// This metric track the number of broker responses with number of groups
limit reached (potential bad responses).
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED("badResponses", false),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]