This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6cc1915140 Add HttpHeaders in broker event listener requestContext
(#12258)
6cc1915140 is described below
commit 6cc19151404cfc7acf5677ef3395e129a978e2d8
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Mon Jan 29 11:19:45 2024 +0530
Add HttpHeaders in broker event listener requestContext (#12258)
---
.../requesthandler/BaseBrokerRequestHandler.java | 6 ++++
.../PinotBrokerQueryEventListenerFactory.java | 34 ++++++++++++++++++++++
.../pinot/spi/trace/DefaultRequestContext.java | 11 +++++++
.../org/apache/pinot/spi/trace/RequestContext.java | 4 +++
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
5 files changed, 56 insertions(+)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index cc640110fc..46a0839dde 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -92,6 +92,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
+import
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
@@ -259,6 +260,11 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
+ if (httpHeaders != null) {
+
requestContext.setRequestHttpHeaders(httpHeaders.getRequestHeaders().entrySet().stream()
+ .filter(entry ->
PinotBrokerQueryEventListenerFactory.getAllowlistQueryRequestHeaders()
+
.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
// First-stage access control to prevent unauthenticated requests from
using up resources. Secondary table-level
// check comes later.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
index 828f214a16..4aa4add27c 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
@@ -20,19 +20,25 @@ package org.apache.pinot.spi.eventlistener.query;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME;
+import static
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS;
import static
org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME;
public class PinotBrokerQueryEventListenerFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class);
private static BrokerQueryEventListener _brokerQueryEventListener = null;
+ private static List<String> _allowlistQueryRequestHeaders = new
ArrayList<>();
private PinotBrokerQueryEventListenerFactory() {
}
@@ -44,6 +50,8 @@ public class PinotBrokerQueryEventListenerFactory {
public synchronized static void init(PinotConfiguration
eventListenerConfiguration) {
// Initializes BrokerQueryEventListener.
initializeBrokerQueryEventListener(eventListenerConfiguration);
+ // Initializes request headers
+ initializeAllowlistQueryRequestHeaders(eventListenerConfiguration);
}
/**
@@ -78,6 +86,19 @@ public class PinotBrokerQueryEventListenerFactory {
+ "Please check if any pinot-event-listener related jar is actually
added to the classpath.");
}
+ /**
+ * Initializes allowlist request-headers to extract from query request.
+ * @param eventListenerConfiguration The subset of the configuration
containing the event-listener-related keys
+ */
+ private static void
initializeAllowlistQueryRequestHeaders(PinotConfiguration
eventListenerConfiguration) {
+ List<String> allowlistQueryRequestHeaders =
+ Splitter.on(",").omitEmptyStrings().trimResults()
+
.splitToList(eventListenerConfiguration.getProperty(CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS,
""));
+
+ LOGGER.info("{}: allowlist headers will be used for
PinotBrokerQueryEventListener", allowlistQueryRequestHeaders);
+ registerAllowlistQueryRequestHeaders(allowlistQueryRequestHeaders);
+ }
+
/**
* Registers a broker event listener.
*/
@@ -86,6 +107,14 @@ public class PinotBrokerQueryEventListenerFactory {
_brokerQueryEventListener = brokerQueryEventListener;
}
+ /**
+ * Registers allowlist http headers for query-requests.
+ */
+ private static void registerAllowlistQueryRequestHeaders(List<String>
allowlistQueryRequestHeaders) {
+ LOGGER.info("Registering query request headers allowlist : {}",
allowlistQueryRequestHeaders);
+ _allowlistQueryRequestHeaders =
ImmutableList.copyOf(allowlistQueryRequestHeaders);
+ }
+
/**
* Returns the brokerQueryEventListener. If the BrokerQueryEventListener is
null,
* first creates and initializes the BrokerQueryEventListener.
@@ -103,4 +132,9 @@ public class PinotBrokerQueryEventListenerFactory {
public static BrokerQueryEventListener getBrokerQueryEventListener() {
return getBrokerQueryEventListener(new
PinotConfiguration(Collections.emptyMap()));
}
+
+ @VisibleForTesting
+ public static List<String> getAllowlistQueryRequestHeaders() {
+ return _allowlistQueryRequestHeaders;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 6ce063d253..ec2b8a5a6b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -80,6 +80,7 @@ public class DefaultRequestContext implements RequestScope {
private long _explainPlanNumMatchAllFilterSegments;
private Map<String, String> _traceInfo = new HashMap<>();
private List<String> _processingExceptions = new ArrayList<>();
+ private Map<String, List<String>> _requestHttpHeaders = new HashMap<>();
public DefaultRequestContext() {
}
@@ -562,6 +563,16 @@ public class DefaultRequestContext implements RequestScope
{
_processingExceptions.addAll(processingExceptions);
}
+ @Override
+ public Map<String, List<String>> getRequestHttpHeaders() {
+ return _requestHttpHeaders;
+ }
+
+ @Override
+ public void setRequestHttpHeaders(Map<String, List<String>>
requestHttpHeaders) {
+ _requestHttpHeaders.putAll(requestHttpHeaders);
+ }
+
@Override
public void close() {
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index f8ec35a921..d1ca85f997 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -217,6 +217,10 @@ public interface RequestContext {
void setProcessingExceptions(List<String> processingExceptions);
+ Map<String, List<String>> getRequestHttpHeaders();
+
+ void setRequestHttpHeaders(Map<String, List<String>> requestHttpHeaders);
+
enum FanoutType {
OFFLINE, REALTIME, HYBRID
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 368e194a39..4a39e278ab 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -44,6 +44,7 @@ public class CommonConstants {
public static final String UNKNOWN = "unknown";
public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME =
"factory.className";
public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME =
"factory.className";
+ public static final String CONFIG_OF_REQUEST_CONTEXT_TRACKED_HEADER_KEYS =
"request.context.tracked.header.keys";
public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
"org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]