This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1748be4f82 Add broker query event listener (#11437)
1748be4f82 is described below
commit 1748be4f82a57ed2cea8ff35d4dd3fbabdb5750e
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Wed Sep 27 01:30:32 2023 +0530
Add broker query event listener (#11437)
---
.../broker/broker/helix/BaseBrokerStarter.java | 17 +++-
.../requesthandler/BaseBrokerRequestHandler.java | 18 +++-
.../requesthandler/GrpcBrokerRequestHandler.java | 6 +-
.../MultiStageBrokerRequestHandler.java | 12 ++-
.../SingleConnectionBrokerRequestHandler.java | 7 +-
.../BaseBrokerRequestHandlerTest.java | 3 +-
.../LiteralOnlyBrokerRequestTest.java | 10 +-
.../MultiStageBrokerRequestHandlerTest.java | 6 +-
.../query/BrokerQueryEventListener.java | 29 ++++++
.../query/NoOpBrokerQueryEventListener.java | 36 +++++++
.../PinotBrokerQueryEventListenerFactory.java | 106 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
12 files changed, 231 insertions(+), 23 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index c6b3627536..4960809565 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -70,6 +70,8 @@ import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
+import
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
@@ -124,6 +126,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
+ protected BrokerQueryEventListener _brokerQueryEventListener;
@Override
public void init(PinotConfiguration brokerConf)
@@ -284,6 +287,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf,
Broker.BROKER_TLS_PREFIX);
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf,
Broker.BROKER_NETTY_PREFIX);
+ LOGGER.info("Initializing Broker Event Listener Factory");
+ _brokerQueryEventListener =
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(
+ _brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
+
// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID,
getDefaultBrokerId());
String brokerRequestHandlerType =
@@ -292,16 +299,18 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
if
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
{
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, queryQuotaManager,
- tableCache, _brokerMetrics, null);
+ tableCache, _brokerMetrics, null, _brokerQueryEventListener);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
- queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
tlsDefaults, _serverRoutingStatsManager);
+ queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
tlsDefaults, _serverRoutingStatsManager,
+ _brokerQueryEventListener);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
- queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
null, _serverRoutingStatsManager);
+ queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
null, _serverRoutingStatsManager,
+ _brokerQueryEventListener);
}
}
@@ -312,7 +321,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
- queryQuotaManager, tableCache, _brokerMetrics);
+ queryQuotaManager, tableCache, _brokerMetrics,
_brokerQueryEventListener);
}
_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId,
singleStageBrokerRequestHandler,
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 420dc15eb2..47ffeddedf 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -90,6 +90,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
@@ -138,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
protected final long _brokerTimeoutMs;
protected final int _queryResponseLimit;
protected final QueryLogger _queryLogger;
+ protected final BrokerQueryEventListener _brokerQueryEventListener;
private final boolean _disableGroovy;
private final boolean _useApproximateFunction;
@@ -148,7 +150,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics) {
+ BrokerMetrics brokerMetrics, BrokerQueryEventListener
brokerQueryEventListener) {
_brokerId = brokerId;
_brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
_config = config;
@@ -172,6 +174,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
+ _brokerQueryEventListener = brokerQueryEventListener;
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log
length: {}, query log max rate: {}qps, "
+ "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs,
_queryResponseLimit,
@@ -222,8 +225,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// Unexpected server responses are collected and returned as exception.
if (status != 200 && status != 404) {
String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
- throw new Exception(String.format("Unexpected status=%d and
response='%s' from uri='%s'", status,
- responseString, uri));
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
}
if (serverResponses != null) {
serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
@@ -251,19 +254,23 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
throws Exception {
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
+ long requestId = _brokerIdGenerator.get();
+ requestContext.setRequestId(requestId);
+
// First-stage access control to prevent unauthenticated requests from
using up resources. Secondary table-level
// check comes later.
boolean hasAccess =
_accessControlFactory.create().hasAccess(requesterIdentity);
if (!hasAccess) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
1);
requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+ _brokerQueryEventListener.onQueryCompletion(requestContext);
throw new WebApplicationException("Permission denied",
Response.Status.FORBIDDEN);
}
- long requestId = _brokerIdGenerator.get();
- requestContext.setRequestId(requestId);
JsonNode sql = request.get(Broker.Request.SQL);
if (sql == null) {
+ requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+ _brokerQueryEventListener.onQueryCompletion(requestContext);
throw new BadQueryRequestException("Failed to find 'sql' in the request:
" + request);
}
String query = sql.asText();
@@ -282,6 +289,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
brokerResponse.setRequestId(String.valueOf(requestId));
brokerResponse.setBrokerId(_brokerId);
brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
+ _brokerQueryEventListener.onQueryCompletion(requestContext);
return brokerResponse;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 56be2ea4fb..b2e36d16ef 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +62,9 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
// TODO: Support TLS
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
- super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics);
+ BrokerMetrics brokerMetrics, TlsConfig tlsConfig,
BrokerQueryEventListener brokerQueryEventListener) {
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics,
+ brokerQueryEventListener);
LOGGER.info("Using Grpc BrokerRequestHandler.");
_grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e352ce906e..7fb9b24c14 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -62,6 +62,7 @@ import
org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -77,10 +78,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;
- public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId, BrokerRoutingManager routingManager,
- AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics) {
- super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics);
+
+ public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId,
+ BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
+ QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics,
+ BrokerQueryEventListener brokerQueryEventListener) {
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache,
+ brokerMetrics, brokerQueryEventListener);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index d78aa5ffe8..3ad8648442 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -50,6 +50,7 @@ import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -72,8 +73,10 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
public SingleConnectionBrokerRequestHandler(PinotConfiguration config,
String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
- TlsConfig tlsConfig, ServerRoutingStatsManager
serverRoutingStatsManager) {
- super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics);
+ TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager,
+ BrokerQueryEventListener brokerQueryEventListener) {
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics,
+ brokerQueryEventListener);
LOGGER.info("Using Netty BrokerRequestHandler.");
_brokerReduceService = new BrokerReduceService(_config);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index f2ee4cd03e..e38db6e020 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -210,7 +210,8 @@ public class BaseBrokerRequestHandlerTest {
BaseBrokerRequestHandler requestHandler =
new BaseBrokerRequestHandler(config, "testBrokerId", routingManager,
new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet())) {
+ new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true,
+ Collections.emptySet()), null) {
@Override
public void start() {
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 78c059c696..749afeb233 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
+import
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
@@ -183,7 +184,8 @@ public class LiteralOnlyBrokerRequestTest {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ null, null, mock(ServerRoutingStatsManager.class),
+
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -211,7 +213,8 @@ public class LiteralOnlyBrokerRequestTest {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ null, null, mock(ServerRoutingStatsManager.class),
+
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC',
'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -418,7 +421,8 @@ public class LiteralOnlyBrokerRequestTest {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ null, null, mock(ServerRoutingStatsManager.class),
+
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR
SELECT 1.5, 'test'\"}");
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index fe37953ae0..45d41ffc6f 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
+import
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.trace.DefaultRequestContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -65,8 +66,9 @@ public class MultiStageBrokerRequestHandlerTest {
_config.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
"12345");
_accessControlFactory = new AllowAllAccessControlFactory();
_requestHandler =
- new MultiStageBrokerRequestHandler(_config, "Broker_localhost",
_routingManager, _accessControlFactory,
- _queryQuotaManager, _tableCache, _brokerMetrics);
+ new MultiStageBrokerRequestHandler(_config, "Broker_localhost",
_routingManager,
+ _accessControlFactory, _queryQuotaManager, _tableCache,
_brokerMetrics,
+
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
}
@Test
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
new file mode 100644
index 0000000000..ea07a2df64
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+
+
+public interface BrokerQueryEventListener {
+
+ void init(PinotConfiguration eventListenerConfiguration);
+ void onQueryCompletion(RequestContext requestContext);
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
new file mode 100644
index 0000000000..5b0fbd3a40
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+
+
+public class NoOpBrokerQueryEventListener implements BrokerQueryEventListener {
+
+ @Override
+ public void init(PinotConfiguration eventListenerConfiguration) {
+ // Not implemented method
+ }
+
+ @Override
+ public void onQueryCompletion(RequestContext requestContext) {
+ // Not implemented method
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
new file mode 100644
index 0000000000..828f214a16
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME;
+import static
org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME;
+
+
+public class PinotBrokerQueryEventListenerFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class);
+ private static BrokerQueryEventListener _brokerQueryEventListener = null;
+
+ private PinotBrokerQueryEventListenerFactory() {
+ }
+
+ /**
+ * Initialize the BrokerQueryEventListener and registers the eventListener
+ */
+ @VisibleForTesting
+ public synchronized static void init(PinotConfiguration
eventListenerConfiguration) {
+ // Initializes BrokerQueryEventListener.
+ initializeBrokerQueryEventListener(eventListenerConfiguration);
+ }
+
+ /**
+ * Initializes PinotBrokerQueryEventListener with event-listener
configurations.
+ * @param eventListenerConfiguration The subset of the configuration
containing the event-listener-related keys
+ */
+ private static void initializeBrokerQueryEventListener(PinotConfiguration
eventListenerConfiguration) {
+ String brokerQueryEventListenerClassName =
+
eventListenerConfiguration.getProperty(CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME,
+ DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME);
+ LOGGER.info("{} will be initialized as the PinotBrokerQueryEventListener",
brokerQueryEventListenerClassName);
+
+ Optional<Class<?>> clazzFound;
+ try {
+ clazzFound =
Optional.of(Class.forName(brokerQueryEventListenerClassName));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to initialize
BrokerQueryEventListener. "
+ + "Please check if any pinot-event-listener related jar is actually
added to the classpath.");
+ }
+
+ clazzFound.ifPresent(clazz -> {
+ try {
+ BrokerQueryEventListener brokerQueryEventListener =
(BrokerQueryEventListener) clazz.newInstance();
+ brokerQueryEventListener.init(eventListenerConfiguration);
+ registerBrokerEventListener(brokerQueryEventListener);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while initializing event listener
registry: {}, skipping it", clazz, e);
+ }
+ });
+
+ Preconditions.checkState(_brokerQueryEventListener != null, "Failed to
initialize BrokerQueryEventListener. "
+ + "Please check if any pinot-event-listener related jar is actually
added to the classpath.");
+ }
+
+ /**
+ * Registers a broker event listener.
+ */
+ private static void registerBrokerEventListener(BrokerQueryEventListener
brokerQueryEventListener) {
+ LOGGER.info("Registering broker event listener : {}",
brokerQueryEventListener.getClass().getName());
+ _brokerQueryEventListener = brokerQueryEventListener;
+ }
+
+ /**
+ * Returns the brokerQueryEventListener. If the BrokerQueryEventListener is
null,
+ * first creates and initializes the BrokerQueryEventListener.
+ * @param eventListenerConfiguration event-listener configs
+ */
+ public static synchronized BrokerQueryEventListener
getBrokerQueryEventListener(
+ PinotConfiguration eventListenerConfiguration) {
+ if (_brokerQueryEventListener == null) {
+ init(eventListenerConfiguration);
+ }
+ return _brokerQueryEventListener;
+ }
+
+ @VisibleForTesting
+ public static BrokerQueryEventListener getBrokerQueryEventListener() {
+ return getBrokerQueryEventListener(new
PinotConfiguration(Collections.emptyMap()));
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 56e6dcf999..80c1816702 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -43,8 +43,11 @@ public class CommonConstants {
public static final String UNKNOWN = "unknown";
public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME =
"factory.className";
+ public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME =
"factory.className";
public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
"org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
+ public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =
+ "org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener";
public static final String SWAGGER_AUTHORIZATION_KEY = "oauth";
public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH =
"META-INF/resources/webjars/swagger-ui/5.1.0/";
@@ -205,6 +208,7 @@ public class CommonConstants {
public static final String ROUTING_TABLE_CONFIG_PREFIX =
"pinot.broker.routing.table";
public static final String ACCESS_CONTROL_CONFIG_PREFIX =
"pinot.broker.access.control";
public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
+ public static final String EVENT_LISTENER_CONFIG_PREFIX =
"pinot.broker.event.listener";
public static final String CONFIG_OF_METRICS_NAME_PREFIX =
"pinot.broker.metrics.prefix";
public static final String DEFAULT_METRICS_NAME_PREFIX = "pinot.broker.";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]