This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0839fb1 Refactor HelixBrokerStarter to separate constructor and
start() (#4100)
0839fb1 is described below
commit 0839fb1ee565abd7e6058e1e0fc7372cedd2fa0b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Apr 17 13:11:06 2019 -0700
Refactor HelixBrokerStarter to separate constructor and start() (#4100)
For the starter class, constructor should only set the properties,
but not start the service, which should be done by start() instead.
- Refactor HelixBrokerStarter to separate constructor and start()
- Modify shutdown() to shut done all components properly
- Change the APIs to plug in custom change handlers without
extending the class
NOTE: THIS IS A BACKWARD-INCOMPATIBLE CHANGE
- Need to call start() separately in order to start the broker
- Order of arguments changed in constructor (move nullable to the
last. This is intentional so that user can find the incompatible
easier during compilation, thus add the start())
---
.../pinot/broker/broker/BrokerServerBuilder.java | 7 +-
.../broker/broker/helix/ClusterChangeHandler.java | 10 +
.../broker/broker/helix/HelixBrokerStarter.java | 339 ++++++++++-----------
.../broker/helix/LiveInstanceChangeHandler.java | 10 +-
.../broker/queryquota/TableQueryQuotaManager.java | 29 +-
.../routing/HelixExternalViewBasedRouting.java | 49 +--
.../broker/broker/HelixBrokerStarterTest.java | 3 +-
.../queryquota/TableQueryQuotaManagerTest.java | 3 +-
.../broker/routing/RandomRoutingTableTest.java | 5 +-
.../pinot/broker/routing/RoutingTableTest.java | 36 ++-
.../apache/pinot/common/utils/CommonConstants.java | 2 +
.../apache/pinot/controller/ControllerStarter.java | 3 +-
.../pinot/integration/tests/ClusterTest.java | 7 +-
.../server/starter/helix/HelixServerStarter.java | 3 +-
.../tools/admin/command/StartBrokerCommand.java | 10 +-
.../pinot/tools/perf/PerfBenchmarkDriver.java | 2 +-
16 files changed, 276 insertions(+), 242 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index 387e214..a63e954 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -109,11 +109,12 @@ public class BrokerServerBuilder {
_state.set(State.STARTING);
_brokerRequestHandler.start();
- _brokerAdminApplication.start(_config
- .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+ int brokerQueryPort =
+ _config.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT);
+ _brokerAdminApplication.start(brokerQueryPort);
_state.set(State.RUNNING);
- LOGGER.info("Pinot Broker started");
+ LOGGER.info("Pinot Broker is started and listening on port {} for API
requests", brokerQueryPort);
}
public void stop() {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index d4b82da..9d335a6 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -19,14 +19,24 @@
package org.apache.pinot.broker.broker.helix;
import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.annotations.InterfaceAudience;
+import org.apache.pinot.annotations.InterfaceStability;
/**
* Handles cluster changes such as external view changes, instance config
changes, live instance changes etc.
*/
[email protected]
[email protected]
public interface ClusterChangeHandler {
/**
+ * Initializes the cluster change handler with the given connected Helix
manager.
+ */
+ void init(HelixManager helixManager);
+
+ /**
* Processes the cluster change of the given type (e.g. EXTERNAL_VIEW,
INSTANCE_CONFIG, LIVE_INSTANCE).
*/
void processClusterChange(HelixConstants.ChangeType changeType);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 7bc9c8d..d684d09 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.collect.ImmutableList;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,6 +47,7 @@ import
org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -55,100 +55,161 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Helix Broker Startable
- *
- *
- */
+@SuppressWarnings("unused")
public class HelixBrokerStarter {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixBrokerStarter.class);
private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY =
"pinot.broker.routing.table";
- // Spectator Helix manager handles the custom change listeners, properties
read/write
- private final HelixManager _spectatorHelixManager;
- // Participant Helix manager handles Helix functionality such as state
transitions and messages
- private final HelixManager _participantHelixManager;
-
private final Configuration _brokerConf;
- private final HelixAdmin _helixAdmin;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final HelixDataAccessor _helixDataAccessor;
- private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private final BrokerServerBuilder _brokerServerBuilder;
- private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
- private final MetricsRegistry _metricsRegistry;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
- private final ClusterChangeMediator _clusterChangeMediator;
- private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
-
- // Set after broker is started, which is actually in the constructor.
+ private final String _clusterName;
+ private final String _zkServers;
+ private final String _brokerId;
+
+ private final List<ClusterChangeHandler> _externalViewChangeHandlers = new
ArrayList<>();
+ private final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new
ArrayList<>();
+ private final List<ClusterChangeHandler> _liveInstanceChangeHandlers = new
ArrayList<>();
+
+ // Spectator Helix manager handles the custom change listeners, properties
read/write
+ private HelixManager _spectatorHelixManager;
+ private HelixAdmin _helixAdmin;
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private HelixDataAccessor _helixDataAccessor;
+
+ // Cluster change handlers
+ private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+ private TableQueryQuotaManager _tableQueryQuotaManager;
+ private LiveInstanceChangeHandler _liveInstanceChangeHandler;
+ private ClusterChangeMediator _clusterChangeMediator;
+
+ private BrokerServerBuilder _brokerServerBuilder;
private AccessControlFactory _accessControlFactory;
+ private MetricsRegistry _metricsRegistry;
+
+ // Participant Helix manager handles Helix functionality such as state
transitions and messages
+ private HelixManager _participantHelixManager;
+ private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
- public HelixBrokerStarter(String helixClusterName, String zkServer,
Configuration brokerConf)
+ public HelixBrokerStarter(Configuration brokerConf, String clusterName,
String zkServer)
throws Exception {
- this(null, helixClusterName, zkServer, brokerConf);
+ this(brokerConf, clusterName, zkServer, null);
}
- public HelixBrokerStarter(@Nullable String brokerHost, String
helixClusterName, String zkServer,
- Configuration brokerConf)
+ public HelixBrokerStarter(Configuration brokerConf, String clusterName,
String zkServer, @Nullable String brokerHost)
throws Exception {
- LOGGER.info("Starting Pinot broker");
-
_brokerConf = brokerConf;
+ setupHelixSystemProperties();
+
+ _clusterName = clusterName;
+
+ // Remove all white-spaces from the list of zkServers (if any).
+ _zkServers = zkServer.replaceAll("\\s+", "");
if (brokerHost == null) {
brokerHost = NetUtil.getHostAddress();
}
-
- String brokerId =
_brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+ _brokerId =
_brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" +
_brokerConf
.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+ _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
_brokerId);
+ }
- _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
brokerId);
- setupHelixSystemProperties();
+ private void setupHelixSystemProperties() {
+ // NOTE: Helix will disconnect the manager and disable the instance if it
detects flapping (too frequent disconnect
+ // from ZooKeeper). Setting flapping time window to a small value can
avoid this from happening. Helix ignores the
+ // non-positive value, so set the default value as 1.
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
+
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
+ CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
+ }
- // Remove all white-spaces from the list of zkServers (if any).
- String zkServers = zkServer.replaceAll("\\s+", "");
+ /**
+ * Adds an external view change handler to handle Helix external view change
callbacks.
+ * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
+ * handlers from running. For slow change handler, make it asynchronous.
+ */
+ public void addExternalViewChangeHandler(ClusterChangeHandler
externalViewChangeHandler) {
+ _externalViewChangeHandlers.add(externalViewChangeHandler);
+ }
+
+ /**
+ * Adds an instance config change handler to handle Helix instance config
change callbacks.
+ * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
+ * handlers from running. For slow change handler, make it asynchronous.
+ */
+ public void addInstanceConfigChangeHandler(ClusterChangeHandler
instanceConfigChangeHandler) {
+ _instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
+ }
+
+ /**
+ * Adds a live instance change handler to handle Helix live instance change
callbacks.
+ * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
+ * handlers from running. For slow change handler, make it asynchronous.
+ */
+ public void addLiveInstanceChangeHandler(ClusterChangeHandler
liveInstanceChangeHandler) {
+ _liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
+ }
- LOGGER.info("Connecting Helix components");
- // Connect spectator Helix manager.
+ // TODO: refactor this logic into BrokerServerBuilder
+ public void start()
+ throws Exception {
+ LOGGER.info("Starting Pinot broker");
+
+ // Connect the spectator Helix manager
+ LOGGER.info("Connecting spectator Helix manager");
_spectatorHelixManager =
- HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId,
InstanceType.SPECTATOR, zkServers);
+ HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId,
InstanceType.SPECTATOR, _zkServers);
_spectatorHelixManager.connect();
_helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
- _helixExternalViewBasedRouting = new
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
- brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
- _tableQueryQuotaManager = new
TableQueryQuotaManager(_spectatorHelixManager);
- _liveInstanceChangeHandler = new
LiveInstanceChangeHandler(_spectatorHelixManager);
- _brokerServerBuilder = startBroker(_brokerConf);
+ _helixExternalViewBasedRouting =
+ new
HelixExternalViewBasedRouting(_brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+ _helixExternalViewBasedRouting.init(_spectatorHelixManager);
+ _tableQueryQuotaManager = new TableQueryQuotaManager();
+ _tableQueryQuotaManager.init(_spectatorHelixManager);
+ _liveInstanceChangeHandler = new LiveInstanceChangeHandler();
+ _liveInstanceChangeHandler.init(_spectatorHelixManager);
+
+ // Set up the broker server builder
+ LOGGER.info("Setting up broker server builder");
+ _brokerServerBuilder = new BrokerServerBuilder(_brokerConf,
_helixExternalViewBasedRouting,
+ _helixExternalViewBasedRouting.getTimeBoundaryService(),
_liveInstanceChangeHandler, _tableQueryQuotaManager);
+ _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
_metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
-
- // Initialize cluster change mediator
- Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new
HashMap<>();
- List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>();
- externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
- externalViewChangeHandlers.add(_tableQueryQuotaManager);
-
externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager));
- changeHandlersMap.put(ChangeType.EXTERNAL_VIEW,
externalViewChangeHandlers);
- List<ClusterChangeHandler> instanceConfigChangeHandlers = new
ArrayList<>();
- instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
-
instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager));
- changeHandlersMap.put(ChangeType.INSTANCE_CONFIG,
instanceConfigChangeHandlers);
- List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>();
- liveInstanceChangeHandler.add(_liveInstanceChangeHandler);
-
liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager));
- changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler);
- _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap,
_brokerServerBuilder.getBrokerMetrics());
+ BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
+ _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
+ _tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+ _brokerServerBuilder.start();
+
+ // Initialize the cluster change mediator
+ LOGGER.info("Initializing cluster change mediator");
+ for (ClusterChangeHandler externalViewChangeHandler :
_externalViewChangeHandlers) {
+ externalViewChangeHandler.init(_spectatorHelixManager);
+ }
+ _externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
+ _externalViewChangeHandlers.add(_tableQueryQuotaManager);
+ for (ClusterChangeHandler instanceConfigChangeHandler :
_instanceConfigChangeHandlers) {
+ instanceConfigChangeHandler.init(_spectatorHelixManager);
+ }
+ _instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
+ for (ClusterChangeHandler liveInstanceChangeHandler :
_liveInstanceChangeHandlers) {
+ liveInstanceChangeHandler.init(_spectatorHelixManager);
+ }
+ _liveInstanceChangeHandlers.add(_liveInstanceChangeHandler);
+ Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new
HashMap<>();
+ clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW,
_externalViewChangeHandlers);
+ clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG,
_instanceConfigChangeHandlers);
+ clusterChangeHandlersMap.put(ChangeType.LIVE_INSTANCE,
_liveInstanceChangeHandlers);
+ _clusterChangeMediator = new
ClusterChangeMediator(clusterChangeHandlersMap, brokerMetrics);
_clusterChangeMediator.start();
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
- // Connect participant Helix manager.
+ // Connect the participant Helix manager
+ LOGGER.info("Connecting participant Helix manager");
_participantHelixManager =
- HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId,
InstanceType.PARTICIPANT, zkServers);
+ HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId,
InstanceType.PARTICIPANT, _zkServers);
StateMachineEngine stateMachineEngine =
_participantHelixManager.getStateMachineEngine();
StateModelFactory<?> stateModelFactory =
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore,
_helixDataAccessor,
@@ -161,110 +222,69 @@ public class HelixBrokerStarter {
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
_tbiMessageHandler);
-
- addInstanceTagIfNeeded(helixClusterName, brokerId);
+ addInstanceTagIfNeeded();
+
brokerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+ () -> _participantHelixManager.isConnected() ? 1L : 0L);
+ _participantHelixManager
+ .addPreConnectCallback(() ->
brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
// Register the service status handler
+ LOGGER.info("Registering service status handler");
double minResourcePercentForStartup = _brokerConf
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
ServiceStatus.setServiceStatusCallback(new
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
.of(new
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager,
- helixClusterName, brokerId, minResourcePercentForStartup),
+ _clusterName, _brokerId, minResourcePercentForStartup),
new
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
- helixClusterName, brokerId, minResourcePercentForStartup))));
+ _clusterName, _brokerId, minResourcePercentForStartup))));
- _brokerServerBuilder.getBrokerMetrics()
- .addCallbackGauge("helix.connected", () ->
_participantHelixManager.isConnected() ? 1L : 0L);
-
- _participantHelixManager.addPreConnectCallback(() ->
_brokerServerBuilder.getBrokerMetrics()
- .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
- }
-
- private void setupHelixSystemProperties() {
- // NOTE: Helix will disconnect the manager and disable the instance if it
detects flapping (too frequent disconnect
- // from ZooKeeper). Setting flapping time window to a small value can
avoid this from happening. Helix ignores the
- // non-positive value, so set the default value as 1.
- System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
-
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
- CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
+ LOGGER.info("Finish starting Pinot broker");
}
- private void addInstanceTagIfNeeded(String clusterName, String instanceName)
{
+ private void addInstanceTagIfNeeded() {
InstanceConfig instanceConfig =
-
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(_brokerId));
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags == null || instanceTags.isEmpty()) {
if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore))
{
- _helixAdmin.addInstanceTag(clusterName, instanceName,
+ _helixAdmin.addInstanceTag(_clusterName, _brokerId,
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
} else {
- _helixAdmin.addInstanceTag(clusterName, instanceName,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ _helixAdmin.addInstanceTag(_clusterName, _brokerId,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
}
- private BrokerServerBuilder startBroker(Configuration config) {
- BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config,
_helixExternalViewBasedRouting,
- _helixExternalViewBasedRouting.getTimeBoundaryService(),
_liveInstanceChangeHandler, _tableQueryQuotaManager);
- _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
-
_helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
-
_tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
- brokerServerBuilder.start();
-
- LOGGER.info("Pinot broker ready and listening on port {} for API requests",
- config.getProperty("pinot.broker.client.queryPort"));
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- brokerServerBuilder.stop();
- } catch (final Exception e) {
- LOGGER.error("Caught exception while running shutdown hook", e);
- }
- }
- });
- return brokerServerBuilder;
- }
+ public void shutdown() {
+ LOGGER.info("Shutting down Pinot broker");
- /**
- * To be overridden to plug in custom external view change handlers.
- * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
- * handlers from running. For slow change handler, make it asynchronous.
- *
- * @param spectatorHelixManager Spectator Helix manager
- * @return List of custom external view change handlers to plug in
- */
- @SuppressWarnings("unused")
- protected List<ClusterChangeHandler>
getCustomExternalViewChangeHandlers(HelixManager spectatorHelixManager) {
- return Collections.emptyList();
- }
+ if (_tbiMessageHandler != null) {
+ LOGGER.info("Shutting down time boundary info refresh message handler");
+ _tbiMessageHandler.shutdown();
+ }
- /**
- * To be overridden to plug in custom instance config change handlers.
- * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
- * handlers from running. For slow change handler, make it asynchronous.
- *
- * @param spectatorHelixManager Spectator Helix manager
- * @return List of custom instance config change handlers to plug in
- */
- @SuppressWarnings("unused")
- protected List<ClusterChangeHandler>
getCustomInstanceConfigChangeHandlers(HelixManager spectatorHelixManager) {
- return Collections.emptyList();
- }
+ if (_participantHelixManager != null) {
+ LOGGER.info("Disconnecting participant Helix manager");
+ _participantHelixManager.disconnect();
+ }
- /**
- * To be overridden to plug in custom live instance change handlers.
- * <p>NOTE: all change handlers will be run in a single thread, so any slow
change handler can block other change
- * handlers from running. For slow change handler, make it asynchronous.
- *
- * @param spectatorHelixManager Spectator Helix manager
- * @return List of custom live instance change handlers to plug in
- */
- @SuppressWarnings("unused")
- protected List<ClusterChangeHandler>
getCustomLiveInstanceChangeHandlers(HelixManager spectatorHelixManager) {
- return Collections.emptyList();
+ if (_clusterChangeMediator != null) {
+ LOGGER.info("Stopping cluster change mediator");
+ _clusterChangeMediator.stop();
+ }
+
+ if (_brokerServerBuilder != null) {
+ LOGGER.info("Stopping broker server builder");
+ _brokerServerBuilder.stop();
+ }
+
+ if (_spectatorHelixManager != null) {
+ LOGGER.info("Disconnecting spectator Helix manager");
+ _spectatorHelixManager.disconnect();
+ }
+
+ LOGGER.info("Finish shutting down Pinot broker");
}
public AccessControlFactory getAccessControlFactory() {
@@ -283,42 +303,21 @@ public class HelixBrokerStarter {
return _brokerServerBuilder;
}
- public static HelixBrokerStarter startDefault()
+ public MetricsRegistry getMetricsRegistry() {
+ return _metricsRegistry;
+ }
+
+ public static HelixBrokerStarter getDefault()
throws Exception {
Configuration brokerConf = new BaseConfiguration();
int port = 5001;
brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
port);
brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
60 * 1000L);
- return new HelixBrokerStarter(null, "quickstart", "localhost:2122",
brokerConf);
- }
-
- public void shutdown() {
- LOGGER.info("Shutting down");
-
- if (_participantHelixManager != null) {
- LOGGER.info("Disconnecting participant Helix manager");
- _participantHelixManager.disconnect();
- }
-
- if (_spectatorHelixManager != null) {
- LOGGER.info("Disconnecting spectator Helix manager");
- _spectatorHelixManager.disconnect();
- }
-
- if (_tbiMessageHandler != null) {
- LOGGER.info("Shutting down timeboundary info refresh message handler");
- _tbiMessageHandler.shutdown();
- }
-
- _clusterChangeMediator.stop();
- }
-
- public MetricsRegistry getMetricsRegistry() {
- return _metricsRegistry;
+ return new HelixBrokerStarter(brokerConf, "quickstart", "localhost:2122");
}
public static void main(String[] args)
throws Exception {
- startDefault();
+ getDefault().start();
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
index 6bd1229..5162ce4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
@@ -43,15 +43,17 @@ public class LiveInstanceChangeHandler implements
ClusterChangeHandler {
private static final boolean DO_NOT_RECREATE = false;
- private final HelixDataAccessor _helixDataAccessor;
- private final PropertyKey _liveInstancesKey;
+ private HelixDataAccessor _helixDataAccessor;
+ private PropertyKey _liveInstancesKey;
private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection>
_connectionPool;
private Map<String, String> _liveInstanceToSessionIdMap;
- public LiveInstanceChangeHandler(HelixManager helixManager) {
+ @Override
+ public void init(HelixManager helixManager) {
+ Preconditions.checkState(_helixDataAccessor == null,
"LiveInstanceChangeHandler is already initialized");
_helixDataAccessor = helixManager.getHelixDataAccessor();
- _liveInstancesKey = new
PropertyKey.Builder(helixManager.getClusterName()).liveInstances();
+ _liveInstancesKey = _helixDataAccessor.keyBuilder().liveInstances();
}
public void
init(KeyedPool<PooledNettyClientResourceManager.PooledClientConnection>
connectionPool) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
index 13f24b8..1a685d2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
@@ -48,17 +48,25 @@ import static
org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
public class TableQueryQuotaManager implements ClusterChangeHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableQueryQuotaManager.class);
+ private static final int TIME_RANGE_IN_SECOND = 1;
+
+ private final AtomicInteger _lastKnownBrokerResourceVersion = new
AtomicInteger(-1);
+ private final Map<String, QueryQuotaConfig> _rateLimiterMap = new
ConcurrentHashMap<>();
+ private HelixManager _helixManager;
private BrokerMetrics _brokerMetrics;
- private final HelixManager _helixManager;
- private final AtomicInteger _lastKnownBrokerResourceVersion;
- private final Map<String, QueryQuotaConfig> _rateLimiterMap;
- private static final int TIME_RANGE_IN_SECOND = 1;
- public TableQueryQuotaManager(HelixManager helixManager) {
+ @Override
+ public void init(HelixManager helixManager) {
+ Preconditions.checkState(_helixManager == null, "TableQueryQuotaManager is
already initialized");
_helixManager = helixManager;
- _rateLimiterMap = new ConcurrentHashMap<>();
- _lastKnownBrokerResourceVersion = new AtomicInteger();
+ }
+
+ @Override
+ public void processClusterChange(HelixConstants.ChangeType changeType) {
+ Preconditions
+ .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW,
"Illegal change type: " + changeType);
+ processQueryQuotaChange();
}
/**
@@ -330,11 +338,4 @@ public class TableQueryQuotaManager implements
ClusterChangeHandler {
.info("Processed query quota change in {}ms, {} out of {} query quota
configs rebuilt.", (endTime - startTime),
numRebuilt, _rateLimiterMap.size());
}
-
- @Override
- public void processClusterChange(HelixConstants.ChangeType changeType) {
- Preconditions
- .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW,
"Illegal change type: " + changeType);
- processQueryQuotaChange();
- }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
index 51a9556..f2e3f08 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -69,37 +69,49 @@ import org.slf4j.LoggerFactory;
*/
public class HelixExternalViewBasedRouting implements ClusterChangeHandler,
RoutingTable {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixExternalViewBasedRouting.class);
+ private static final int INVALID_EXTERNAL_VIEW_VERSION = Integer.MIN_VALUE;
- private final Map<String, RoutingTableBuilder> _routingTableBuilderMap;
-
+ private final Map<String, RoutingTableBuilder> _routingTableBuilderMap = new
ConcurrentHashMap<>();
private final Map<String, Integer> _lastKnownExternalViewVersionMap = new
ConcurrentHashMap<>();
private final Map<String, Map<String, InstanceConfig>>
_lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new
ConcurrentHashMap<>();
private final Map<String, Set<String>> _tablesForInstance = new
ConcurrentHashMap<>();
private final Map<String, SegmentSelector> _segmentSelectorMap = new
ConcurrentHashMap<>();
- private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
- private final HelixManager _helixManager;
- private static final int INVALID_EXTERNAL_VIEW_VERSION = Integer.MIN_VALUE;
-
- private BrokerMetrics _brokerMetrics;
-
- private Configuration _configuration;
+ private final Configuration _configuration;
+ private HelixManager _helixManager;
+ private HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
private RoutingTableBuilderFactory _routingTableBuilderFactory;
private SegmentSelectorProvider _segmentSelectorProvider;
+ private BrokerMetrics _brokerMetrics;
- public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord>
propertyStore, HelixManager helixManager,
- Configuration configuration) {
+ public HelixExternalViewBasedRouting(Configuration configuration) {
_configuration = configuration;
- _timeBoundaryService = new
HelixExternalViewBasedTimeBoundaryService(propertyStore);
- _routingTableBuilderMap = new HashMap<>();
+ }
+
+ @Override
+ public void init(HelixManager helixManager) {
+ Preconditions.checkState(_helixManager == null,
"HelixExternalViewBasedRouting is already initialized");
_helixManager = helixManager;
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixManager.getHelixPropertyStore();
+ _timeBoundaryService = new
HelixExternalViewBasedTimeBoundaryService(propertyStore);
_routingTableBuilderFactory = new
RoutingTableBuilderFactory(_configuration, propertyStore);
_segmentSelectorProvider = new SegmentSelectorProvider(propertyStore);
}
@Override
+ public void processClusterChange(HelixConstants.ChangeType changeType) {
+ Preconditions.checkState(changeType ==
HelixConstants.ChangeType.EXTERNAL_VIEW
+ || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal
change type: " + changeType);
+ if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
+ processExternalViewChange();
+ } else {
+ processInstanceConfigChange();
+ }
+ }
+
+ @Override
public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest
request) {
String tableName = request.getTableName();
RoutingTableBuilder routingTableBuilder =
_routingTableBuilderMap.get(tableName);
@@ -601,15 +613,4 @@ public class HelixExternalViewBasedRouting implements
ClusterChangeHandler, Rout
return JsonUtils.objectToPrettyString(ret);
}
-
- @Override
- public void processClusterChange(HelixConstants.ChangeType changeType) {
- Preconditions.checkState(changeType ==
HelixConstants.ChangeType.EXTERNAL_VIEW
- || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal
change type: " + changeType);
- if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
- processExternalViewChange();
- } else {
- processInstanceConfigChange();
- }
- }
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b3a0452..1dfa1d0 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -74,7 +74,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
_brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
8943);
_brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
100L);
- _helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, _brokerConf);
+ _helixBrokerStarter = new HelixBrokerStarter(_brokerConf,
getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
+ _helixBrokerStarter.start();
ControllerRequestBuilderUtil
.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 5, true);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
index 1b5d709..3e5eae2 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
@@ -62,7 +62,8 @@ public class TableQueryQuotaManagerTest {
_helixManager = initHelixManager(helixClusterName);
_testPropertyStore = _helixManager.getHelixPropertyStore();
- _tableQueryQuotaManager = new TableQueryQuotaManager(_helixManager);
+ _tableQueryQuotaManager = new TableQueryQuotaManager();
+ _tableQueryQuotaManager.init(_helixManager);
}
private HelixManager initHelixManager(String helixClusterName) {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
index ac02919..5b5cdd6 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.io.IOUtils;
+import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
@@ -35,6 +36,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableConfig.Builder;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -58,7 +60,8 @@ public class RandomRoutingTableTest {
int numSegmentsInEV = externalView.getPartitionSet().size();
int numServersInEV = instanceConfigs.size();
- HelixExternalViewBasedRouting routing = new
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+ HelixExternalViewBasedRouting routing = new
HelixExternalViewBasedRouting(new BaseConfiguration());
+ routing.init(Mockito.mock(HelixManager.class));
routing.markDataResourceOnline(generateTableConfig(tableName),
externalView, instanceConfigs);
for (int i = 0; i < NUM_ROUNDS; i++) {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
index c89b1c2..e583799 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import
org.apache.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
@@ -41,6 +42,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -51,7 +53,8 @@ public class RoutingTableTest {
@Test
public void testHelixExternalViewBasedRoutingTable()
throws Exception {
- HelixExternalViewBasedRouting routingTable = new
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+ HelixExternalViewBasedRouting routingTable = new
HelixExternalViewBasedRouting(new BaseConfiguration());
+ routingTable.init(Mockito.mock(HelixManager.class));
ExternalView externalView = new ExternalView("testResource0_OFFLINE");
externalView.setState("segment0", "dataServer_instance_0", "ONLINE");
@@ -113,20 +116,22 @@ public class RoutingTableTest {
final MutableBoolean timeBoundaryUpdated = new MutableBoolean(false);
- HelixExternalViewBasedRouting routingTable =
- new HelixExternalViewBasedRouting(propertyStore, null, new
BaseConfiguration()) {
- @Override
- protected ExternalView fetchExternalView(String table) {
- return offlineExternalView;
- }
+ HelixExternalViewBasedRouting routingTable = new
HelixExternalViewBasedRouting(new BaseConfiguration()) {
+ @Override
+ protected ExternalView fetchExternalView(String table) {
+ return offlineExternalView;
+ }
- @Override
- protected void updateTimeBoundary(String tableName, ExternalView
externalView) {
- if (tableName.equals("myTable_OFFLINE")) {
- timeBoundaryUpdated.setValue(true);
- }
- }
- };
+ @Override
+ protected void updateTimeBoundary(String tableName, ExternalView
externalView) {
+ if (tableName.equals("myTable_OFFLINE")) {
+ timeBoundaryUpdated.setValue(true);
+ }
+ }
+ };
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+
Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ routingTable.init(helixManager);
routingTable.setBrokerMetrics(new BrokerMetrics(new MetricsRegistry()));
Assert.assertFalse(timeBoundaryUpdated.booleanValue());
@@ -166,7 +171,8 @@ public class RoutingTableTest {
final LLCSegmentName llcSegmentName = new LLCSegmentName("testResource0",
2, 65, System.currentTimeMillis());
- HelixExternalViewBasedRouting routingTable = new
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+ HelixExternalViewBasedRouting routingTable = new
HelixExternalViewBasedRouting(new BaseConfiguration());
+ routingTable.init(Mockito.mock(HelixManager.class));
Field realtimeRTBField =
HelixExternalViewBasedRouting.class.getDeclaredField("_realtimeHLCRoutingTableBuilder");
realtimeRTBField.setAccessible(true);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 62ed53f..8ed7999 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -29,6 +29,8 @@ public class CommonConstants {
public static class Helix {
public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
+ public static final String INSTANCE_CONNECTED_METRIC_NAME =
"helix.connected";
+
public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 8f57c11..4a900c6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,7 +219,8 @@ public class ControllerStarter {
.setup(_helixClusterName, _helixZkURL, _instanceId,
_isUpdateStateModel, _enableBatchMessageMode);
// Emit helix controller metrics
- _controllerMetrics.addCallbackGauge("helix.connected", () ->
_helixControllerManager.isConnected() ? 1L : 0L);
+
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+ () -> _helixControllerManager.isConnected() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("helix.leader", () ->
_helixControllerManager.isLeader() ? 1L : 0L);
_helixControllerManager.addPreConnectCallback(
() ->
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d127779..d671311 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -127,7 +127,9 @@ public abstract class ClusterTest extends ControllerTest {
BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
}
overrideBrokerConf(brokerConf);
- _brokerStarters.add(new HelixBrokerStarter(_clusterName, zkStr,
brokerConf));
+ HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf,
_clusterName, zkStr);
+ brokerStarter.start();
+ _brokerStarters.add(brokerStarter);
}
}
@@ -225,8 +227,7 @@ public abstract class ClusterTest extends ControllerTest {
protected void stopBroker() {
for (HelixBrokerStarter brokerStarter : _brokerStarters) {
try {
- // TODO: replace with brokerStarter.shutdown() once they are hooked up
- brokerStarter.getBrokerServerBuilder().stop();
+ brokerStarter.shutdown();
} catch (Exception e) {
LOGGER.error("Encountered exception while stopping broker {}",
e.getMessage());
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 64f7b32..5d3ed26 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -174,7 +174,8 @@ public class HelixServerStarter {
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
messageHandlerFactory);
- serverMetrics.addCallbackGauge("helix.connected", () ->
_helixManager.isConnected() ? 1L : 0L);
+
serverMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+ () -> _helixManager.isConnected() ? 1L : 0L);
_helixManager
.addPreConnectCallback(() ->
serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index 52b046f..1e6af76 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -41,7 +41,6 @@ public class StartBrokerCommand extends
AbstractBaseAdminCommand implements Comm
@Option(name = "-brokerPort", required = false, metaVar = "<int>", usage =
"Broker port number to use for query.")
private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
- ;
@Option(name = "-zkAddress", required = false, metaVar = "<http>", usage =
"HTTP address of Zookeeper.")
private String _zkAddress = DEFAULT_ZK_ADDRESS;
@@ -59,6 +58,8 @@ public class StartBrokerCommand extends
AbstractBaseAdminCommand implements Comm
return _help;
}
+ private HelixBrokerStarter _brokerStarter;
+
@Override
public String getName() {
return "StartBroker";
@@ -75,7 +76,9 @@ public class StartBrokerCommand extends
AbstractBaseAdminCommand implements Comm
@Override
public void cleanup() {
-
+ if (_brokerStarter != null) {
+ _brokerStarter.shutdown();
+ }
}
@Override
@@ -119,7 +122,8 @@ public class StartBrokerCommand extends
AbstractBaseAdminCommand implements Comm
}
LOGGER.info("Executing command: " + toString());
- new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress,
brokerConf);
+ _brokerStarter = new HelixBrokerStarter(brokerConf, _clusterName,
_zkAddress, _brokerHost);
+ _brokerStarter.start();
String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() +
".pid";
savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index 1df27e5..6d7928d 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -221,7 +221,7 @@ public class PerfBenchmarkDriver {
brokerConf.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
brokerInstanceName);
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
BROKER_TIMEOUT_MS);
LOGGER.info("Starting broker instance: {}", brokerInstanceName);
- new HelixBrokerStarter(_clusterName, _zkAddress, brokerConf);
+ new HelixBrokerStarter(brokerConf, _clusterName, _zkAddress).start();
}
private void startServer()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]