This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8b26df6 Add a broker metric to distinguish exception happens when
acquire channel lock or when send request to server (#8105)
8b26df6 is described below
commit 8b26df6b758cde4445e8bf3682a325226fd020a9
Author: Liang Mingqiang <[email protected]>
AuthorDate: Wed Feb 2 18:02:59 2022 -0800
Add a broker metric to distinguish exception happens when acquire channel
lock or when send request to server (#8105)
* Add a broker metric to distinguish exception happens when acquire channel
lock or send request to server
* don't use reflection in catch block
* compare timeut message
---
.../apache/pinot/common/metrics/BrokerMeter.java | 1 +
.../apache/pinot/core/transport/QueryRouter.java | 24 ++++++++++++++++------
.../pinot/core/transport/ServerChannels.java | 3 ++-
3 files changed, 21 insertions(+), 7 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 0d505cf..67dbb37 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -43,6 +43,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// Scatter phase.
NO_SERVER_FOUND_EXCEPTIONS("exceptions", false),
REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS("exceptions", false),
+ REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS("exceptions", false),
REQUEST_SEND_EXCEPTIONS("exceptions", false),
// Gather phase.
RESPONSE_FETCH_EXCEPTIONS("exceptions", false),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 40c32c8..db08d96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -119,15 +120,18 @@ public class QueryRouter {
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ?
_serverChannelsTls : _serverChannels;
try {
- serverChannels.sendRequest(rawTableName, asyncQueryResponse,
serverRoutingInstance, entry.getValue(),
- timeoutMs);
+ serverChannels
+ .sendRequest(rawTableName, asyncQueryResponse,
serverRoutingInstance, entry.getValue(), timeoutMs);
asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
+ } catch (TimeoutException e) {
+ if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) {
+ _brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1);
+ }
+ markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse,
e);
+ break;
} catch (Exception e) {
- LOGGER.error("Caught exception while sending request {} to server: {},
marking query failed", requestId,
- serverRoutingInstance, e);
_brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
- asyncQueryResponse.setBrokerRequestSendException(e);
- asyncQueryResponse.markQueryFailed();
+ markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse,
e);
break;
}
}
@@ -135,6 +139,14 @@ public class QueryRouter {
return asyncQueryResponse;
}
+ private void markQueryFailed(long requestId, ServerRoutingInstance
serverRoutingInstance,
+ AsyncQueryResponse asyncQueryResponse, Exception e) {
+ LOGGER.error("Caught exception while sending request {} to server: {},
marking query failed", requestId,
+ serverRoutingInstance, e);
+ asyncQueryResponse.setBrokerRequestSendException(e);
+ asyncQueryResponse.markQueryFailed();
+ }
+
public void shutDown() {
_serverChannels.shutDown();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index e5b422a..2e6a77c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -53,6 +53,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
*/
@ThreadSafe
public class ServerChannels {
+ public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while
acquiring channel lock";
private final QueryRouter _queryRouter;
private final BrokerMetrics _brokerMetrics;
private final TSerializer _serializer = new TSerializer(new
TCompactProtocol.Factory());
@@ -154,7 +155,7 @@ public class ServerChannels {
_channelLock.unlock();
}
} else {
- throw new TimeoutException("Timeout while acquiring channel lock");
+ throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]