This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc1915140 Add HttpHeaders in broker event listener requestContext 
(#12258)
6cc1915140 is described below

commit 6cc19151404cfc7acf5677ef3395e129a978e2d8
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Mon Jan 29 11:19:45 2024 +0530

    Add HttpHeaders in broker event listener requestContext (#12258)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  6 ++++
 .../PinotBrokerQueryEventListenerFactory.java      | 34 ++++++++++++++++++++++
 .../pinot/spi/trace/DefaultRequestContext.java     | 11 +++++++
 .../org/apache/pinot/spi/trace/RequestContext.java |  4 +++
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 5 files changed, 56 insertions(+)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index cc640110fc..46a0839dde 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -92,6 +92,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
+import 
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
@@ -259,6 +260,11 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
     long requestId = _brokerIdGenerator.get();
     requestContext.setRequestId(requestId);
+    if (httpHeaders != null) {
+      
requestContext.setRequestHttpHeaders(httpHeaders.getRequestHeaders().entrySet().stream()
+          .filter(entry -> 
PinotBrokerQueryEventListenerFactory.getAllowlistQueryRequestHeaders()
+              
.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    }
 
     // First-stage access control to prevent unauthenticated requests from 
using up resources. Secondary table-level
     // check comes later.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
index 828f214a16..4aa4add27c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
@@ -20,19 +20,25 @@ package org.apache.pinot.spi.eventlistener.query;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME;
+import static 
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS;
 import static 
org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME;
 
 
 public class PinotBrokerQueryEventListenerFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class);
   private static BrokerQueryEventListener _brokerQueryEventListener = null;
+  private static List<String> _allowlistQueryRequestHeaders = new 
ArrayList<>();
 
   private PinotBrokerQueryEventListenerFactory() {
   }
@@ -44,6 +50,8 @@ public class PinotBrokerQueryEventListenerFactory {
   public synchronized static void init(PinotConfiguration 
eventListenerConfiguration) {
     // Initializes BrokerQueryEventListener.
     initializeBrokerQueryEventListener(eventListenerConfiguration);
+    // Initializes request headers
+    initializeAllowlistQueryRequestHeaders(eventListenerConfiguration);
   }
 
   /**
@@ -78,6 +86,19 @@ public class PinotBrokerQueryEventListenerFactory {
         + "Please check if any pinot-event-listener related jar is actually 
added to the classpath.");
   }
 
+  /**
+   * Initializes allowlist request-headers to extract from query request.
+   * @param eventListenerConfiguration The subset of the configuration 
containing the event-listener-related keys
+   */
+  private static void 
initializeAllowlistQueryRequestHeaders(PinotConfiguration 
eventListenerConfiguration) {
+    List<String> allowlistQueryRequestHeaders =
+        Splitter.on(",").omitEmptyStrings().trimResults()
+            
.splitToList(eventListenerConfiguration.getProperty(CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS,
 ""));
+
+    LOGGER.info("{}: allowlist headers will be used for 
PinotBrokerQueryEventListener", allowlistQueryRequestHeaders);
+    registerAllowlistQueryRequestHeaders(allowlistQueryRequestHeaders);
+  }
+
   /**
    * Registers a broker event listener.
    */
@@ -86,6 +107,14 @@ public class PinotBrokerQueryEventListenerFactory {
     _brokerQueryEventListener = brokerQueryEventListener;
   }
 
+  /**
+   * Registers allowlist http headers for query-requests.
+   */
+  private static void registerAllowlistQueryRequestHeaders(List<String> 
allowlistQueryRequestHeaders) {
+    LOGGER.info("Registering query request headers allowlist : {}", 
allowlistQueryRequestHeaders);
+    _allowlistQueryRequestHeaders = 
ImmutableList.copyOf(allowlistQueryRequestHeaders);
+  }
+
   /**
    * Returns the brokerQueryEventListener. If the BrokerQueryEventListener is 
null,
    * first creates and initializes the BrokerQueryEventListener.
@@ -103,4 +132,9 @@ public class PinotBrokerQueryEventListenerFactory {
   public static BrokerQueryEventListener getBrokerQueryEventListener() {
     return getBrokerQueryEventListener(new 
PinotConfiguration(Collections.emptyMap()));
   }
+
+  @VisibleForTesting
+  public static List<String> getAllowlistQueryRequestHeaders() {
+    return _allowlistQueryRequestHeaders;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 6ce063d253..ec2b8a5a6b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -80,6 +80,7 @@ public class DefaultRequestContext implements RequestScope {
   private long _explainPlanNumMatchAllFilterSegments;
   private Map<String, String> _traceInfo = new HashMap<>();
   private List<String> _processingExceptions = new ArrayList<>();
+  private Map<String, List<String>> _requestHttpHeaders = new HashMap<>();
 
   public DefaultRequestContext() {
   }
@@ -562,6 +563,16 @@ public class DefaultRequestContext implements RequestScope 
{
     _processingExceptions.addAll(processingExceptions);
   }
 
+  @Override
+  public Map<String, List<String>> getRequestHttpHeaders() {
+    return _requestHttpHeaders;
+  }
+
+  @Override
+  public void setRequestHttpHeaders(Map<String, List<String>> 
requestHttpHeaders) {
+    _requestHttpHeaders.putAll(requestHttpHeaders);
+  }
+
   @Override
   public void close() {
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index f8ec35a921..d1ca85f997 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -217,6 +217,10 @@ public interface RequestContext {
 
   void setProcessingExceptions(List<String> processingExceptions);
 
+  Map<String, List<String>> getRequestHttpHeaders();
+
+  void setRequestHttpHeaders(Map<String, List<String>> requestHttpHeaders);
+
   enum FanoutType {
     OFFLINE, REALTIME, HYBRID
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 368e194a39..4a39e278ab 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -44,6 +44,7 @@ public class CommonConstants {
   public static final String UNKNOWN = "unknown";
   public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME = 
"factory.className";
   public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME = 
"factory.className";
+  public static final String CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS = 
"request.context.tracked.header.keys";
   public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
       "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
   public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to