This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 67344859c2 Log Request header for Broker (#8907)
67344859c2 is described below

commit 67344859c270b9e64638b6691d456e57a7570403
Author: abhinav <[email protected]>
AuthorDate: Mon Aug 8 13:55:39 2022 -0700

    Log Request header for Broker (#8907)
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 40 +++++++++++++++++++---
 .../apache/pinot/spi/utils/CommonConstants.java    |  4 +++
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 985235d0ef..7ca62d1ac9 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -454,8 +455,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
       // Send empty response since we don't need to evaluate either offline or 
realtime request.
       BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
+      // Extract source info from incoming request
       logBrokerResponse(requestId, query, requestContext, tableName, 0, new 
ServerStats(), brokerResponse,
-          System.nanoTime());
+          System.nanoTime(), requesterIdentity);
       return brokerResponse;
     }
 
@@ -589,8 +591,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     _brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs,
         TimeUnit.MILLISECONDS);
 
+    // Extract source info from incoming request
     logBrokerResponse(requestId, query, requestContext, tableName, 
numUnavailableSegments, serverStats, brokerResponse,
-        totalTimeMs);
+        totalTimeMs, requesterIdentity);
     return brokerResponse;
   }
 
@@ -661,9 +664,34 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   }
 
   private void logBrokerResponse(long requestId, String query, RequestContext 
requestContext, String tableName,
-      int numUnavailableSegments, ServerStats serverStats, 
BrokerResponseNative brokerResponse, long totalTimeMs) {
+      int numUnavailableSegments, ServerStats serverStats, 
BrokerResponseNative brokerResponse, long totalTimeMs,
+      @Nullable RequesterIdentity requesterIdentity) {
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
+    boolean enableClientIpLogging = 
_config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
+        Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING);
+    String clientIp = "unknown";
+    // If reverse proxy is used X-Forwarded-For will be populated
+    // If X-Forwarded-For is not present, check if x-real-ip is present
+    // Since X-Forwarded-For can contain comma separated list of values, we 
convert it to ";" delimiter to avoid
+    // downstream parsing errors for other fields where "," is being used
+    if (enableClientIpLogging && requesterIdentity != null) {
+      for (Map.Entry<String, String> entry : ((HttpRequesterIdentity) 
requesterIdentity).getHttpHeaders().entries()) {
+        String key = entry.getKey();
+        String value = entry.getValue();
+        if (key.equalsIgnoreCase("x-forwarded-for")) {
+          if (value.contains(",")) {
+            clientIp = String.join(";", value.split(","));
+          } else {
+            clientIp = value;
+          }
+        } else if (key.equalsIgnoreCase("x-real-ip")) {
+          clientIp = value;
+        }
+      }
+    }
+
+
     // Please keep the format as name=value comma-separated with no spaces
     // Please keep all the name value pairs together, then followed by the 
query. To add a new entry, please add it to
     // the end of existing pairs, but before the query.
@@ -674,7 +702,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
               + "{}/{}/{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
               + 
"servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
               + 
"offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
-              + 
"realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},query={}",
 requestId, tableName,
+              + 
"realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},clientIp={}"
+              + ",query={}", requestId, tableName,
           totalTimeMs, brokerResponse.getNumDocsScanned(), 
brokerResponse.getTotalDocs(),
           brokerResponse.getNumEntriesScannedInFilter(), 
brokerResponse.getNumEntriesScannedPostFilter(),
           brokerResponse.getNumSegmentsQueried(), 
brokerResponse.getNumSegmentsProcessed(),
@@ -687,7 +716,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
           brokerResponse.getOfflineThreadCpuTimeNs(), 
brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
           brokerResponse.getOfflineResponseSerializationCpuTimeNs(), 
brokerResponse.getRealtimeTotalCpuTimeNs(),
           brokerResponse.getRealtimeThreadCpuTimeNs(), 
brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(),
-          brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), 
StringUtils.substring(query, 0, _queryLogLength));
+          brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), clientIp,
+          StringUtils.substring(query, 0, _queryLogLength));
 
       // Limit the dropping log message at most once per second.
       if (_numDroppedLogRateLimiter.tryAcquire()) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 6157bfbbb6..47866461ec 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -254,6 +254,10 @@ public class CommonConstants {
 
     public static final String CONTROLLER_URL = "pinot.broker.controller.url";
 
+    public static final String CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING =
+        "pinot.broker.request.client.ip.logging";
+    public static final boolean DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING = 
true;
+
     public static class Request {
       public static final String SQL = "sql";
       public static final String TRACE = "trace";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to