This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b26df6  Add a broker metric to distinguish exception happens when 
acquire channel lock or when send request to server (#8105)
8b26df6 is described below

commit 8b26df6b758cde4445e8bf3682a325226fd020a9
Author: Liang Mingqiang <[email protected]>
AuthorDate: Wed Feb 2 18:02:59 2022 -0800

    Add a broker metric to distinguish exception happens when acquire channel 
lock or when send request to server (#8105)
    
    * Add a broker metric to distinguish exception happens when acquire channel 
lock or send request to server
    
    * don't use reflection in catch block
    
    * compare timeut message
---
 .../apache/pinot/common/metrics/BrokerMeter.java   |  1 +
 .../apache/pinot/core/transport/QueryRouter.java   | 24 ++++++++++++++++------
 .../pinot/core/transport/ServerChannels.java       |  3 ++-
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 0d505cf..67dbb37 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -43,6 +43,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   // Scatter phase.
   NO_SERVER_FOUND_EXCEPTIONS("exceptions", false),
   REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS("exceptions", false),
+  REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS("exceptions", false),
   REQUEST_SEND_EXCEPTIONS("exceptions", false),
   // Gather phase.
   RESPONSE_FETCH_EXCEPTIONS("exceptions", false),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 40c32c8..db08d96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -119,15 +120,18 @@ public class QueryRouter {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
       ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? 
_serverChannelsTls : _serverChannels;
       try {
-        serverChannels.sendRequest(rawTableName, asyncQueryResponse, 
serverRoutingInstance, entry.getValue(),
-            timeoutMs);
+        serverChannels
+            .sendRequest(rawTableName, asyncQueryResponse, 
serverRoutingInstance, entry.getValue(), timeoutMs);
         asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
+      } catch (TimeoutException e) {
+        if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) {
+          _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1);
+        }
+        markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, 
e);
+        break;
       } catch (Exception e) {
-        LOGGER.error("Caught exception while sending request {} to server: {}, 
marking query failed", requestId,
-            serverRoutingInstance, e);
         _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
-        asyncQueryResponse.setBrokerRequestSendException(e);
-        asyncQueryResponse.markQueryFailed();
+        markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, 
e);
         break;
       }
     }
@@ -135,6 +139,14 @@ public class QueryRouter {
     return asyncQueryResponse;
   }
 
+  private void markQueryFailed(long requestId, ServerRoutingInstance 
serverRoutingInstance,
+      AsyncQueryResponse asyncQueryResponse, Exception e) {
+    LOGGER.error("Caught exception while sending request {} to server: {}, 
marking query failed", requestId,
+        serverRoutingInstance, e);
+    asyncQueryResponse.setBrokerRequestSendException(e);
+    asyncQueryResponse.markQueryFailed();
+  }
+
   public void shutDown() {
     _serverChannels.shutDown();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index e5b422a..2e6a77c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -53,6 +53,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
  */
 @ThreadSafe
 public class ServerChannels {
+  public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while 
acquiring channel lock";
   private final QueryRouter _queryRouter;
   private final BrokerMetrics _brokerMetrics;
   private final TSerializer _serializer = new TSerializer(new 
TCompactProtocol.Factory());
@@ -154,7 +155,7 @@ public class ServerChannels {
           _channelLock.unlock();
         }
       } else {
-        throw new TimeoutException("Timeout while acquiring channel lock");
+        throw new TimeoutException(CHANNEL_LOCK_TIMEOUT_MSG);
       }
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to