This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1748be4f82 Add broker query event listener (#11437)
1748be4f82 is described below

commit 1748be4f82a57ed2cea8ff35d4dd3fbabdb5750e
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Wed Sep 27 01:30:32 2023 +0530

    Add broker query event listener (#11437)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  17 +++-
 .../requesthandler/BaseBrokerRequestHandler.java   |  18 +++-
 .../requesthandler/GrpcBrokerRequestHandler.java   |   6 +-
 .../MultiStageBrokerRequestHandler.java            |  12 ++-
 .../SingleConnectionBrokerRequestHandler.java      |   7 +-
 .../BaseBrokerRequestHandlerTest.java              |   3 +-
 .../LiteralOnlyBrokerRequestTest.java              |  10 +-
 .../MultiStageBrokerRequestHandlerTest.java        |   6 +-
 .../query/BrokerQueryEventListener.java            |  29 ++++++
 .../query/NoOpBrokerQueryEventListener.java        |  36 +++++++
 .../PinotBrokerQueryEventListenerFactory.java      | 106 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 12 files changed, 231 insertions(+), 23 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index c6b3627536..4960809565 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -70,6 +70,8 @@ import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
+import 
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.services.ServiceRole;
@@ -124,6 +126,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected HelixManager _participantHelixManager;
   // Handles the server routing stats.
   protected ServerRoutingStatsManager _serverRoutingStatsManager;
+  protected BrokerQueryEventListener _brokerQueryEventListener;
 
   @Override
   public void init(PinotConfiguration brokerConf)
@@ -284,6 +287,10 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, 
Broker.BROKER_TLS_PREFIX);
     NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, 
Broker.BROKER_NETTY_PREFIX);
 
+    LOGGER.info("Initializing Broker Event Listener Factory");
+    _brokerQueryEventListener = 
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(
+        _brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
+
     // Create Broker request handler.
     String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, 
getDefaultBrokerId());
     String brokerRequestHandlerType =
@@ -292,16 +299,18 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     if 
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
 {
       singleStageBrokerRequestHandler =
           new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, 
_accessControlFactory, queryQuotaManager,
-              tableCache, _brokerMetrics, null);
+              tableCache, _brokerMetrics, null, _brokerQueryEventListener);
     } else { // default request handler type, e.g. netty
       if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
         singleStageBrokerRequestHandler =
             new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, 
_routingManager, _accessControlFactory,
-                queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
tlsDefaults, _serverRoutingStatsManager);
+                queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
tlsDefaults, _serverRoutingStatsManager,
+                    _brokerQueryEventListener);
       } else {
         singleStageBrokerRequestHandler =
             new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, 
_routingManager, _accessControlFactory,
-                queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
null, _serverRoutingStatsManager);
+                queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
null, _serverRoutingStatsManager,
+                    _brokerQueryEventListener);
       }
     }
 
@@ -312,7 +321,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       // TODO: decouple protocol and engine selection.
       multiStageBrokerRequestHandler =
           new MultiStageBrokerRequestHandler(_brokerConf, brokerId, 
_routingManager, _accessControlFactory,
-              queryQuotaManager, tableCache, _brokerMetrics);
+              queryQuotaManager, tableCache, _brokerMetrics, 
_brokerQueryEventListener);
     }
 
     _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, 
singleStageBrokerRequestHandler,
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 420dc15eb2..47ffeddedf 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -90,6 +90,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
@@ -138,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected final long _brokerTimeoutMs;
   protected final int _queryResponseLimit;
   protected final QueryLogger _queryLogger;
+  protected final BrokerQueryEventListener _brokerQueryEventListener;
 
   private final boolean _disableGroovy;
   private final boolean _useApproximateFunction;
@@ -148,7 +150,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, 
BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      BrokerMetrics brokerMetrics) {
+      BrokerMetrics brokerMetrics, BrokerQueryEventListener 
brokerQueryEventListener) {
     _brokerId = brokerId;
     _brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
     _config = config;
@@ -172,6 +174,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     boolean enableQueryCancellation =
         
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
     _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
+    _brokerQueryEventListener = brokerQueryEventListener;
     LOGGER.info(
         "Broker Id: {}, timeout: {}ms, query response limit: {}, query log 
length: {}, query log max rate: {}qps, "
             + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, 
_queryResponseLimit,
@@ -222,8 +225,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         // Unexpected server responses are collected and returned as exception.
         if (status != 200 && status != 404) {
           String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
-          throw new Exception(String.format("Unexpected status=%d and 
response='%s' from uri='%s'", status,
-              responseString, uri));
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
         }
         if (serverResponses != null) {
           serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
@@ -251,19 +254,23 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       throws Exception {
     requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
 
+    long requestId = _brokerIdGenerator.get();
+    requestContext.setRequestId(requestId);
+
     // First-stage access control to prevent unauthenticated requests from 
using up resources. Secondary table-level
     // check comes later.
     boolean hasAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity);
     if (!hasAccess) {
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
       requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+      _brokerQueryEventListener.onQueryCompletion(requestContext);
       throw new WebApplicationException("Permission denied", 
Response.Status.FORBIDDEN);
     }
 
-    long requestId = _brokerIdGenerator.get();
-    requestContext.setRequestId(requestId);
     JsonNode sql = request.get(Broker.Request.SQL);
     if (sql == null) {
+      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+      _brokerQueryEventListener.onQueryCompletion(requestContext);
       throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
     }
     String query = sql.asText();
@@ -282,6 +289,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     brokerResponse.setRequestId(String.valueOf(requestId));
     brokerResponse.setBrokerId(_brokerId);
     brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
+    _brokerQueryEventListener.onQueryCompletion(requestContext);
     return brokerResponse;
   }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 56be2ea4fb..b2e36d16ef 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +62,9 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   // TODO: Support TLS
   public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, 
BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
-    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache, brokerMetrics);
+      BrokerMetrics brokerMetrics, TlsConfig tlsConfig, 
BrokerQueryEventListener brokerQueryEventListener) {
+    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache, brokerMetrics,
+        brokerQueryEventListener);
     LOGGER.info("Using Grpc BrokerRequestHandler.");
     _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e352ce906e..7fb9b24c14 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -62,6 +62,7 @@ import 
org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.query.type.TypeFactory;
 import org.apache.pinot.query.type.TypeSystem;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -77,10 +78,13 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   private final MailboxService _mailboxService;
   private final QueryDispatcher _queryDispatcher;
 
-  public MultiStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId, BrokerRoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
-      BrokerMetrics brokerMetrics) {
-    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache, brokerMetrics);
+
+  public MultiStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
+      BrokerRoutingManager routingManager, AccessControlFactory 
accessControlFactory,
+      QueryQuotaManager queryQuotaManager, TableCache tableCache, 
BrokerMetrics brokerMetrics,
+      BrokerQueryEventListener brokerQueryEventListener) {
+    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache,
+        brokerMetrics, brokerQueryEventListener);
     LOGGER.info("Using Multi-stage BrokerRequestHandler.");
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index d78aa5ffe8..3ad8648442 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -50,6 +50,7 @@ import org.apache.pinot.core.transport.ServerResponse;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -72,8 +73,10 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
   public SingleConnectionBrokerRequestHandler(PinotConfiguration config, 
String brokerId,
       BrokerRoutingManager routingManager, AccessControlFactory 
accessControlFactory,
       QueryQuotaManager queryQuotaManager, TableCache tableCache, 
BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
-      TlsConfig tlsConfig, ServerRoutingStatsManager 
serverRoutingStatsManager) {
-    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache, brokerMetrics);
+      TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager,
+      BrokerQueryEventListener brokerQueryEventListener) {
+    super(config, brokerId, routingManager, accessControlFactory, 
queryQuotaManager, tableCache, brokerMetrics,
+        brokerQueryEventListener);
     LOGGER.info("Using Netty BrokerRequestHandler.");
 
     _brokerReduceService = new BrokerReduceService(_config);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index f2ee4cd03e..e38db6e020 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -210,7 +210,8 @@ public class BaseBrokerRequestHandlerTest {
     BaseBrokerRequestHandler requestHandler =
         new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, 
new AllowAllAccessControlFactory(),
             queryQuotaManager, tableCache,
-            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), 
true, Collections.emptySet())) {
+            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), 
true,
+                Collections.emptySet()), null) {
           @Override
           public void start() {
           }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 78c059c696..749afeb233 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
@@ -183,7 +184,8 @@ public class LiteralOnlyBrokerRequestTest {
     SingleConnectionBrokerRequestHandler requestHandler =
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
             null, null, new BrokerMetrics("", 
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+            null, null, mock(ServerRoutingStatsManager.class),
+                
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
 
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
@@ -211,7 +213,8 @@ public class LiteralOnlyBrokerRequestTest {
     SingleConnectionBrokerRequestHandler requestHandler =
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
             null, null, new BrokerMetrics("", 
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+            null, null, mock(ServerRoutingStatsManager.class),
+                
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = JsonUtils.stringToJsonNode(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 
'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -418,7 +421,8 @@ public class LiteralOnlyBrokerRequestTest {
     SingleConnectionBrokerRequestHandler requestHandler =
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), 
"testBrokerId", null, ACCESS_CONTROL_FACTORY,
             null, null, new BrokerMetrics("", 
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+            null, null, mock(ServerRoutingStatsManager.class),
+                
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
 
     // Test 1: select constant
     JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR 
SELECT 1.5, 'test'\"}");
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index fe37953ae0..45d41ffc6f 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.trace.DefaultRequestContext;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -65,8 +66,9 @@ public class MultiStageBrokerRequestHandlerTest {
     
_config.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 "12345");
     _accessControlFactory = new AllowAllAccessControlFactory();
     _requestHandler =
-        new MultiStageBrokerRequestHandler(_config, "Broker_localhost", 
_routingManager, _accessControlFactory,
-            _queryQuotaManager, _tableCache, _brokerMetrics);
+        new MultiStageBrokerRequestHandler(_config, "Broker_localhost", 
_routingManager,
+                _accessControlFactory, _queryQuotaManager, _tableCache, 
_brokerMetrics,
+                
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
   }
 
   @Test
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
new file mode 100644
index 0000000000..ea07a2df64
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+
+
+public interface BrokerQueryEventListener {
+
+  void init(PinotConfiguration eventListenerConfiguration);
+  void onQueryCompletion(RequestContext requestContext);
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
new file mode 100644
index 0000000000..5b0fbd3a40
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+
+
+public class NoOpBrokerQueryEventListener implements BrokerQueryEventListener {
+
+  @Override
+  public void init(PinotConfiguration eventListenerConfiguration) {
+    // Not implemented method
+  }
+
+  @Override
+  public void onQueryCompletion(RequestContext requestContext) {
+    // Not implemented method
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
new file mode 100644
index 0000000000..828f214a16
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.eventlistener.query;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME;
+import static 
org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME;
+
+
+public class PinotBrokerQueryEventListenerFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class);
+  private static BrokerQueryEventListener _brokerQueryEventListener = null;
+
+  private PinotBrokerQueryEventListenerFactory() {
+  }
+
+  /**
+   * Initialize the BrokerQueryEventListener and registers the eventListener
+   */
+  @VisibleForTesting
+  public synchronized static void init(PinotConfiguration 
eventListenerConfiguration) {
+    // Initializes BrokerQueryEventListener.
+    initializeBrokerQueryEventListener(eventListenerConfiguration);
+  }
+
+  /**
+   * Initializes PinotBrokerQueryEventListener with event-listener 
configurations.
+   * @param eventListenerConfiguration The subset of the configuration 
containing the event-listener-related keys
+   */
+  private static void initializeBrokerQueryEventListener(PinotConfiguration 
eventListenerConfiguration) {
+    String brokerQueryEventListenerClassName =
+        
eventListenerConfiguration.getProperty(CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME,
+            DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME);
+    LOGGER.info("{} will be initialized as the PinotBrokerQueryEventListener", 
brokerQueryEventListenerClassName);
+
+    Optional<Class<?>> clazzFound;
+    try {
+      clazzFound = 
Optional.of(Class.forName(brokerQueryEventListenerClassName));
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Failed to initialize 
BrokerQueryEventListener. "
+          + "Please check if any pinot-event-listener related jar is actually 
added to the classpath.");
+    }
+
+    clazzFound.ifPresent(clazz -> {
+      try {
+        BrokerQueryEventListener brokerQueryEventListener = 
(BrokerQueryEventListener) clazz.newInstance();
+        brokerQueryEventListener.init(eventListenerConfiguration);
+        registerBrokerEventListener(brokerQueryEventListener);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while initializing event listener 
registry: {}, skipping it", clazz, e);
+      }
+    });
+
+    Preconditions.checkState(_brokerQueryEventListener != null, "Failed to 
initialize BrokerQueryEventListener. "
+        + "Please check if any pinot-event-listener related jar is actually 
added to the classpath.");
+  }
+
+  /**
+   * Registers a broker event listener.
+   */
+  private static void registerBrokerEventListener(BrokerQueryEventListener 
brokerQueryEventListener) {
+    LOGGER.info("Registering broker event listener : {}", 
brokerQueryEventListener.getClass().getName());
+    _brokerQueryEventListener = brokerQueryEventListener;
+  }
+
+  /**
+   * Returns the brokerQueryEventListener. If the BrokerQueryEventListener is 
null,
+   * first creates and initializes the BrokerQueryEventListener.
+   * @param eventListenerConfiguration event-listener configs
+   */
+  public static synchronized BrokerQueryEventListener 
getBrokerQueryEventListener(
+      PinotConfiguration eventListenerConfiguration) {
+    if (_brokerQueryEventListener == null) {
+      init(eventListenerConfiguration);
+    }
+    return _brokerQueryEventListener;
+  }
+
+  @VisibleForTesting
+  public static BrokerQueryEventListener getBrokerQueryEventListener() {
+    return getBrokerQueryEventListener(new 
PinotConfiguration(Collections.emptyMap()));
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 56e6dcf999..80c1816702 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -43,8 +43,11 @@ public class CommonConstants {
 
   public static final String UNKNOWN = "unknown";
   public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME = 
"factory.className";
+  public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME = 
"factory.className";
   public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME =
       "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory";
+  public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME =
+      "org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener";
 
   public static final String SWAGGER_AUTHORIZATION_KEY = "oauth";
   public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH = 
"META-INF/resources/webjars/swagger-ui/5.1.0/";
@@ -205,6 +208,7 @@ public class CommonConstants {
     public static final String ROUTING_TABLE_CONFIG_PREFIX = 
"pinot.broker.routing.table";
     public static final String ACCESS_CONTROL_CONFIG_PREFIX = 
"pinot.broker.access.control";
     public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
+    public static final String EVENT_LISTENER_CONFIG_PREFIX = 
"pinot.broker.event.listener";
     public static final String CONFIG_OF_METRICS_NAME_PREFIX = 
"pinot.broker.metrics.prefix";
     public static final String DEFAULT_METRICS_NAME_PREFIX = "pinot.broker.";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to