This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9c64672340 Move brokerId extraction to BaseBrokerStarter (#9965)
9c64672340 is described below
commit 9c646723403d8a9e2d180ba68ad306489bff48b3
Author: Jialiang Li <[email protected]>
AuthorDate: Sun Dec 11 22:57:54 2022 -0800
Move brokerId extraction to BaseBrokerStarter (#9965)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../broker/broker/helix/BaseBrokerStarter.java | 23 ++++++++++++++++------
.../requesthandler/BaseBrokerRequestHandler.java | 15 ++------------
.../BrokerRequestHandlerDelegate.java | 5 ++++-
.../requesthandler/GrpcBrokerRequestHandler.java | 4 ++--
.../MultiStageBrokerRequestHandler.java | 12 +++++------
.../SingleConnectionBrokerRequestHandler.java | 10 +++++-----
.../BaseBrokerRequestHandlerTest.java | 2 +-
.../LiteralOnlyBrokerRequestTest.java | 18 ++++++++---------
8 files changed, 46 insertions(+), 43 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 694e6fde1c..6407fd0b29 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -264,21 +265,22 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf,
Broker.BROKER_NETTY_PREFIX);
// Create Broker request handler.
+ String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID,
getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE,
Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BrokerRequestHandler singleStageBrokerRequestHandler = null;
if
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
{
singleStageBrokerRequestHandler =
- new GrpcBrokerRequestHandler(_brokerConf, _routingManager,
_accessControlFactory, queryQuotaManager,
+ new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf,
_routingManager, _accessControlFactory,
+ new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
tlsDefaults, _serverRoutingStatsManager);
} else {
singleStageBrokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf,
_routingManager, _accessControlFactory,
+ new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults,
null, _serverRoutingStatsManager);
}
}
@@ -289,11 +291,11 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
// worker requires both the "Netty port" for protocol transport; and
"GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
- new MultiStageBrokerRequestHandler(_brokerConf, _routingManager,
_accessControlFactory, queryQuotaManager,
- tableCache, _brokerMetrics);
+ new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
_routingManager, _accessControlFactory,
+ queryQuotaManager, tableCache, _brokerMetrics);
}
- _brokerRequestHandler = new
BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
+ _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId,
singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler, _brokerMetrics);
_brokerRequestHandler.start();
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -433,6 +435,15 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting,
this::isShuttingDown))));
}
+ private String getDefaultBrokerId() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting default broker Id", e);
+ return "";
+ }
+ }
+
@Override
public void stop() {
LOGGER.info("Shutting down Pinot broker");
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ae88689a51..7a704bcffa 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -129,9 +128,10 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById;
- public BaseBrokerRequestHandler(PinotConfiguration config,
BrokerRoutingManager routingManager,
+ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
+ _brokerId = brokerId;
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
@@ -146,7 +146,6 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
_enableDistinctCountBitmapOverride =
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY,
false);
- _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID,
getDefaultBrokerId());
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT,
Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
@@ -160,15 +159,6 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
}
- private String getDefaultBrokerId() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (Exception e) {
- LOGGER.error("Caught exception while getting default broker Id", e);
- return "";
- }
- }
-
@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not
enabled on broker");
@@ -240,7 +230,6 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
- requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index c6eee5f04f..3e6a0598be 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -49,9 +49,11 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
private final BrokerRequestHandler _singleStageBrokerRequestHandler;
private final BrokerRequestHandler _multiStageWorkerRequestHandler;
private final BrokerMetrics _brokerMetrics;
+ private final String _brokerId;
- public BrokerRequestHandlerDelegate(BrokerRequestHandler
singleStageBrokerRequestHandler,
+ public BrokerRequestHandlerDelegate(String brokerId, BrokerRequestHandler
singleStageBrokerRequestHandler,
@Nullable BrokerRequestHandler multiStageWorkerRequestHandler,
BrokerMetrics brokerMetrics) {
+ _brokerId = brokerId;
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageWorkerRequestHandler = multiStageWorkerRequestHandler;
_brokerMetrics = brokerMetrics;
@@ -81,6 +83,7 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
public BrokerResponse handleRequest(JsonNode request, @Nullable
SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
+ requestContext.setBrokerId(_brokerId);
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions =
RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(),
request);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 2af8027668..95d17c955b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -59,10 +59,10 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final PinotStreamingQueryClient _streamingQueryClient;
// TODO: Support TLS
- public GrpcBrokerRequestHandler(PinotConfiguration config,
BrokerRoutingManager routingManager,
+ public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
- super(config, routingManager, accessControlFactory, queryQuotaManager,
tableCache, brokerMetrics);
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Grpc BrokerRequestHandler.");
_grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 6cdd68cd80..34acd15b48 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -72,15 +72,16 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
- public MultiStageBrokerRequestHandler(PinotConfiguration config,
BrokerRoutingManager routingManager,
- AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics) {
- super(config, routingManager, accessControlFactory, queryQuotaManager,
tableCache, brokerMetrics);
+ public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerIdFromConfig,
+ BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
+ QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
+ super(config, brokerIdFromConfig, routingManager, accessControlFactory,
queryQuotaManager, tableCache,
+ brokerMetrics);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String reducerHostname =
config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (reducerHostname == null) {
// use broker ID as host name, but remove the
- String brokerId =
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
+ String brokerId = brokerIdFromConfig;
brokerId =
brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ?
brokerId.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId;
brokerId = StringUtils.split(brokerId, "_").length > 1 ?
StringUtils.split(brokerId, "_")[0] : brokerId;
@@ -108,7 +109,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
- requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index c34b8dda6a..d78aa5ffe8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -69,11 +69,11 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
private final QueryRouter _queryRouter;
private final FailureDetector _failureDetector;
- public SingleConnectionBrokerRequestHandler(PinotConfiguration config,
BrokerRoutingManager routingManager,
- AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig
tlsConfig,
- ServerRoutingStatsManager serverRoutingStatsManager) {
- super(config, routingManager, accessControlFactory, queryQuotaManager,
tableCache, brokerMetrics);
+ public SingleConnectionBrokerRequestHandler(PinotConfiguration config,
String brokerId,
+ BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
+ QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
+ TlsConfig tlsConfig, ServerRoutingStatsManager
serverRoutingStatsManager) {
+ super(config, brokerId, routingManager, accessControlFactory,
queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Netty BrokerRequestHandler.");
_brokerReduceService = new BrokerReduceService(_config);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index 88f72600e5..d2ea5c9b7a 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -206,7 +206,7 @@ public class BaseBrokerRequestHandlerTest {
PinotConfiguration config =
new
PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation",
"true"));
BaseBrokerRequestHandler requestHandler =
- new BaseBrokerRequestHandler(config, routingManager, new
AllowAllAccessControlFactory(),
+ new BaseBrokerRequestHandler(config, null, routingManager, new
AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet())) {
@Override
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 389db74fb6..a9ee2a5dee 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC',
'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(),
null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("",
PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR
SELECT 1.5, 'test'\"}");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]