This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch broker_starter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1b537cb655339932b25f48001fa3b0b8f9e30a8f Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Tue Apr 9 17:59:14 2019 -0700 Refactor HelixBrokerStarter to separate constructor and start() For the starter class, constructor should only set the properies, but not start the service, which should be done by start() instead. - Refactor HelixBrokerStarter to separate constructor and start() - Remove redundant configs, always use constant for configs - Modify shutdown() to shut done all components properly NOTE: THIS IS A BACKWARD-INCOMPATIBLIE CHANGE - Need to call start() separately in order to start the broker - Order of arguments changed in constructor (move nullable to the last. This is intentional so that user can find the imcompatible easier during compilation, thus add the start()) --- .../pinot/broker/broker/BrokerServerBuilder.java | 7 +- .../broker/broker/helix/ClusterChangeHandler.java | 4 + .../broker/helix/DefaultHelixBrokerConfig.java | 59 ----- .../broker/broker/helix/HelixBrokerStarter.java | 288 +++++++++------------ .../pinot/broker/broker/BrokerTestUtils.java | 51 ---- .../broker/broker/HelixBrokerStarterTest.java | 34 +-- .../broker/broker/HelixBrokerStarterUtilsTest.java | 63 ----- .../apache/pinot/common/utils/CommonConstants.java | 4 + .../pinot/integration/tests/ClusterTest.java | 37 +-- .../tests/NewConfigApplyIntegrationTest.java | 3 +- .../tools/admin/command/StartBrokerCommand.java | 22 +- .../pinot/tools/perf/PerfBenchmarkDriver.java | 2 +- 12 files changed, 188 insertions(+), 386 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java index 387e214..a63e954 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java @@ -109,11 +109,12 @@ public class BrokerServerBuilder { _state.set(State.STARTING); _brokerRequestHandler.start(); - _brokerAdminApplication.start(_config - .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT)); + int brokerQueryPort = + _config.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT); + _brokerAdminApplication.start(brokerQueryPort); _state.set(State.RUNNING); - LOGGER.info("Pinot Broker started"); + LOGGER.info("Pinot Broker is started and listening on port {} for API requests", brokerQueryPort); } public void stop() { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java index d4b82da..69a4524 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java @@ -19,11 +19,15 @@ package org.apache.pinot.broker.broker.helix; import org.apache.helix.HelixConstants; +import org.apache.pinot.annotations.InterfaceAudience; +import org.apache.pinot.annotations.InterfaceStability; /** * Handles cluster changes such as external view changes, instance config changes, live instance changes etc. */ [email protected] [email protected] public interface ClusterChangeHandler { /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java deleted file mode 100644 index c264598..0000000 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker.helix; - -import java.util.Iterator; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; - - -public class DefaultHelixBrokerConfig { - public static final String HELIX_FLAPPING_TIME_WINDOW_NAME = "pinot.broker.helix.flappingTimeWindowMs"; - public static final String DEFAULT_HELIX_FLAPPING_TIMEIWINDWOW_MS = "0"; - - public static Configuration getDefaultBrokerConf() { - Configuration brokerConf = new PropertiesConfiguration(); - - // config based routing - brokerConf.addProperty("pinot.broker.transport.routingMode", "HELIX"); - - brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.class", "balanced"); - brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.numOfRoutingTables", "10"); - brokerConf.addProperty("pinot.broker.routing.table.builder.tables", ""); - - //client properties - brokerConf.addProperty("pinot.broker.client.queryPort", "8099"); - - // [PINOT-2435] setting to 0 so it doesn't disconnect from zk - brokerConf.addProperty("pinot.broker.helix.flappingTimeWindowMs", "0"); - - return brokerConf; - } - - public static Configuration getDefaultBrokerConf(Configuration externalConfigs) { - final Configuration defaultConfigs = getDefaultBrokerConf(); - @SuppressWarnings("unchecked") - Iterator<String> iterable = externalConfigs.getKeys(); - while (iterable.hasNext()) { - String key = iterable.next(); - defaultConfigs.setProperty(key, externalConfigs.getProperty(key)); - } - return defaultConfigs; - } -} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index 337a6c3..8f98b0d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -25,15 +25,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.ZNRecord; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; @@ -47,189 +48,196 @@ import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.common.utils.ServiceStatus; -import org.apache.pinot.common.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Helix Broker Startable - * - * - */ public class HelixBrokerStarter { - private static final String PROPERTY_STORE = "PROPERTYSTORE"; + private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class); + private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table"; + + private final Configuration _properties; + private final String _clusterName; + private final String _zkServers; + private final String _brokerId; // Spectator Helix manager handles the custom change listeners, properties read/write - private final HelixManager _spectatorHelixManager; + private HelixManager _spectatorHelixManager; // Participant Helix manager handles Helix functionality such as state transitions and messages - private final HelixManager _participantHelixManager; - - private final Configuration _pinotHelixProperties; - private final HelixAdmin _helixAdmin; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; - private final HelixDataAccessor _helixDataAccessor; - private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting; - private final BrokerServerBuilder _brokerServerBuilder; - private final LiveInstanceChangeHandler _liveInstanceChangeHandler; - private final MetricsRegistry _metricsRegistry; - private final TableQueryQuotaManager _tableQueryQuotaManager; - private final ClusterChangeMediator _clusterChangeMediator; - private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler; - - // Set after broker is started, which is actually in the constructor. - private AccessControlFactory _accessControlFactory; + private HelixManager _participantHelixManager; - private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class); - - private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table"; + private HelixAdmin _helixAdmin; + private ZkHelixPropertyStore<ZNRecord> _propertyStore; + private HelixDataAccessor _helixDataAccessor; + private HelixExternalViewBasedRouting _helixExternalViewBasedRouting; + private BrokerServerBuilder _brokerServerBuilder; + private AccessControlFactory _accessControlFactory; + private MetricsRegistry _metricsRegistry; + private ClusterChangeMediator _clusterChangeMediator; + private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler; - public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration pinotHelixProperties) + public HelixBrokerStarter(Configuration properties, String clusterName, String zkServer) throws Exception { - this(null, helixClusterName, zkServer, pinotHelixProperties); + this(properties, clusterName, zkServer, null); } - public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer, - Configuration pinotHelixProperties) + public HelixBrokerStarter(Configuration properties, String clusterName, String zkServer, @Nullable String brokerHost) throws Exception { - LOGGER.info("Starting Pinot broker"); + _properties = properties; + setupHelixSystemProperties(); - _pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties); + _clusterName = clusterName; + + // Remove all white-spaces from the list of zkServers (if any). + _zkServers = zkServer.replaceAll("\\s+", ""); if (brokerHost == null) { brokerHost = NetUtil.getHostAddress(); } - - final String brokerId = _pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, - CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _pinotHelixProperties + _brokerId = _properties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, + CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _properties .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT)); + _properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, _brokerId); + } - _pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId); - setupHelixSystemProperties(); + private void setupHelixSystemProperties() { + System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _properties + .getString(CommonConstants.Broker.CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS, + CommonConstants.Broker.DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS)); + } - // Remove all white-spaces from the list of zkServers (if any). - String zkServers = zkServer.replaceAll("\\s+", ""); + public void start() + throws Exception { + LOGGER.info("Starting Pinot broker"); - LOGGER.info("Connecting Helix components"); - // Connect spectator Helix manager. + // Connect the spectator Helix manager + LOGGER.info("Connecting spectator Helix manager"); _spectatorHelixManager = - HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, InstanceType.SPECTATOR, zkServers); + HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, InstanceType.SPECTATOR, _zkServers); _spectatorHelixManager.connect(); _helixAdmin = _spectatorHelixManager.getClusterManagmentTool(); _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager, - pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY)); - _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager); - _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager); - _brokerServerBuilder = startBroker(_pinotHelixProperties); + _properties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY)); + TableQueryQuotaManager tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager); + LiveInstanceChangeHandler liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager); + + // Set up the broker server builder + LOGGER.info("Setting up broker server builder"); + _brokerServerBuilder = new BrokerServerBuilder(_properties, _helixExternalViewBasedRouting, + _helixExternalViewBasedRouting.getTimeBoundaryService(), liveInstanceChangeHandler, tableQueryQuotaManager); + _accessControlFactory = _brokerServerBuilder.getAccessControlFactory(); _metricsRegistry = _brokerServerBuilder.getMetricsRegistry(); + BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics(); + _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics); + tableQueryQuotaManager.setBrokerMetrics(brokerMetrics); + _brokerServerBuilder.start(); - // Initialize cluster change mediator + // Initialize the cluster change mediator + LOGGER.info("Initializing cluster change mediator"); Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new HashMap<>(); List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>(); externalViewChangeHandlers.add(_helixExternalViewBasedRouting); - externalViewChangeHandlers.add(_tableQueryQuotaManager); + externalViewChangeHandlers.add(tableQueryQuotaManager); externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager)); changeHandlersMap.put(ChangeType.EXTERNAL_VIEW, externalViewChangeHandlers); List<ClusterChangeHandler> instanceConfigChangeHandlers = new ArrayList<>(); instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting); instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager)); changeHandlersMap.put(ChangeType.INSTANCE_CONFIG, instanceConfigChangeHandlers); - List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>(); - liveInstanceChangeHandler.add(_liveInstanceChangeHandler); - liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager)); - changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler); - _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, _brokerServerBuilder.getBrokerMetrics()); + List<ClusterChangeHandler> liveInstanceChangeHandlers = new ArrayList<>(); + liveInstanceChangeHandlers.add(liveInstanceChangeHandler); + liveInstanceChangeHandlers.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager)); + changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandlers); + _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, brokerMetrics); _clusterChangeMediator.start(); _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator); _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator); _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator); - // Connect participant Helix manager. + // Connect the participant Helix manager + LOGGER.info("Connecting participant Helix manager"); _participantHelixManager = - HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, InstanceType.PARTICIPANT, zkServers); + HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, InstanceType.PARTICIPANT, _zkServers); StateMachineEngine stateMachineEngine = _participantHelixManager.getStateMachineEngine(); StateModelFactory<?> stateModelFactory = new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, - _helixExternalViewBasedRouting, _tableQueryQuotaManager); + _helixExternalViewBasedRouting, tableQueryQuotaManager); stateMachineEngine .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory); _participantHelixManager.connect(); - _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, - _pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, + _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, _properties + .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS)); _participantHelixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), _tbiMessageHandler); - - addInstanceTagIfNeeded(helixClusterName, brokerId); + addInstanceTagIfNeeded(); + brokerMetrics.addCallbackGauge("helix.connected", () -> _participantHelixManager.isConnected() ? 1L : 0L); + _participantHelixManager + .addPreConnectCallback(() -> brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); // Register the service status handler - double minResourcePercentForStartup = _pinotHelixProperties + LOGGER.info("Registering service status handler"); + double minResourcePercentForStartup = _properties .getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager, - helixClusterName, brokerId, minResourcePercentForStartup), + _clusterName, _brokerId, minResourcePercentForStartup), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager, - helixClusterName, brokerId, minResourcePercentForStartup)))); - - _brokerServerBuilder.getBrokerMetrics() - .addCallbackGauge("helix.connected", () -> _participantHelixManager.isConnected() ? 1L : 0L); + _clusterName, _brokerId, minResourcePercentForStartup)))); - _participantHelixManager.addPreConnectCallback(() -> _brokerServerBuilder.getBrokerMetrics() - .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); + LOGGER.info("Finish starting Pinot broker"); } - private void setupHelixSystemProperties() { - final String helixFlappingTimeWindowPropName = "helixmanager.flappingTimeWindow"; - System.setProperty(helixFlappingTimeWindowPropName, _pinotHelixProperties - .getString(DefaultHelixBrokerConfig.HELIX_FLAPPING_TIME_WINDOW_NAME, - DefaultHelixBrokerConfig.DEFAULT_HELIX_FLAPPING_TIMEIWINDWOW_MS)); - } - - private void addInstanceTagIfNeeded(String clusterName, String instanceName) { + private void addInstanceTagIfNeeded() { InstanceConfig instanceConfig = - _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName)); + _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(_brokerId)); List<String> instanceTags = instanceConfig.getTags(); if (instanceTags == null || instanceTags.isEmpty()) { if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) { - _helixAdmin.addInstanceTag(clusterName, instanceName, + _helixAdmin.addInstanceTag(_clusterName, _brokerId, TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)); } else { - _helixAdmin.addInstanceTag(clusterName, instanceName, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + _helixAdmin.addInstanceTag(_clusterName, _brokerId, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); } } } - private BrokerServerBuilder startBroker(Configuration config) { - if (config == null) { - config = DefaultHelixBrokerConfig.getDefaultBrokerConf(); + public void shutdown() { + LOGGER.info("Shutting down Pinot broker"); + + if (_tbiMessageHandler != null) { + LOGGER.info("Shutting down time boundary info refresh message handler"); + _tbiMessageHandler.shutdown(); } - BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting, - _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager); - _accessControlFactory = brokerServerBuilder.getAccessControlFactory(); - _helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics()); - _tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics()); - brokerServerBuilder.start(); - - LOGGER.info("Pinot broker ready and listening on port {} for API requests", - config.getProperty("pinot.broker.client.queryPort")); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - brokerServerBuilder.stop(); - } catch (final Exception e) { - LOGGER.error("Caught exception while running shutdown hook", e); - } - } - }); - return brokerServerBuilder; + + if (_participantHelixManager != null) { + LOGGER.info("Disconnecting participant Helix manager"); + _participantHelixManager.disconnect(); + } + + if (_clusterChangeMediator != null) { + LOGGER.info("Stopping cluster change mediator"); + _clusterChangeMediator.stop(); + } + + if (_brokerServerBuilder != null) { + LOGGER.info("Stopping broker server builder"); + _brokerServerBuilder.stop(); + } + + if (_spectatorHelixManager != null) { + LOGGER.info("Disconnecting spectator Helix manager"); + _spectatorHelixManager.disconnect(); + } + + LOGGER.info("Finish shutting down Pinot broker"); } /** @@ -275,32 +283,6 @@ public class HelixBrokerStarter { return _accessControlFactory; } - /** - * The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which applies - * the /helixClusterName/PROPERTY_STORE after chroot to all servers. - * Expected output for this method is: - * 127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE - * - * @param zkServers - * @param helixClusterName - * @return the full property store path - * - * @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int, org.apache.zookeeper.Watcher) - */ - public static String getZkAddressForBroker(String zkServers, String helixClusterName) { - List tokens = new ArrayList<String>(); - String[] zkSplit = zkServers.split("/", 2); - String zkHosts = zkSplit[0]; - String zkPathSuffix = StringUtil.join("/", helixClusterName, PROPERTY_STORE); - if (zkSplit.length > 1) { - zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix; - } - for (String token : zkHosts.split(",")) { - tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"), zkPathSuffix)); - } - return StringUtils.join(tokens, ","); - } - public HelixManager getSpectatorHelixManager() { return _spectatorHelixManager; } @@ -313,45 +295,21 @@ public class HelixBrokerStarter { return _brokerServerBuilder; } - public static HelixBrokerStarter startDefault() - throws Exception { - Configuration configuration = new PropertiesConfiguration(); - int port = 5001; - configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port); - configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L); - - final HelixBrokerStarter pinotHelixBrokerStarter = - new HelixBrokerStarter(null, "quickstart", "localhost:2122", configuration); - return pinotHelixBrokerStarter; - } - - public void shutdown() { - LOGGER.info("Shutting down"); - - if (_participantHelixManager != null) { - LOGGER.info("Disconnecting participant Helix manager"); - _participantHelixManager.disconnect(); - } - - if (_spectatorHelixManager != null) { - LOGGER.info("Disconnecting spectator Helix manager"); - _spectatorHelixManager.disconnect(); - } - - if (_tbiMessageHandler != null) { - LOGGER.info("Shutting down timeboundary info refresh message handler"); - _tbiMessageHandler.shutdown(); - } - - _clusterChangeMediator.stop(); - } - public MetricsRegistry getMetricsRegistry() { return _metricsRegistry; } + public static HelixBrokerStarter getDefault() + throws Exception { + Configuration properties = new PropertiesConfiguration(); + int port = 5001; + properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port); + properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 30 * 1000L); + return new HelixBrokerStarter(properties, "quickstart", "localhost:2122"); + } + public static void main(String[] args) throws Exception { - startDefault(); + getDefault().start(); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java deleted file mode 100644 index f863e56..0000000 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker; - -import org.apache.commons.configuration.Configuration; -import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig; -import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; - - -/** - * Utilities to start a broker during unit tests. - * - */ -public class BrokerTestUtils { - public static Configuration getDefaultBrokerConfiguration() { - return DefaultHelixBrokerConfig.getDefaultBrokerConf(); - } - - public static HelixBrokerStarter startBroker(final String clusterName, final String zkStr, - final Configuration configuration) { - try { - return new HelixBrokerStarter(clusterName, zkStr, configuration); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void stopBroker(final HelixBrokerStarter brokerStarter) { - try { - brokerStarter.getBrokerServerBuilder().stop(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 08387a8..7436a89 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -30,9 +30,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; import org.apache.pinot.broker.routing.TimeBoundaryService; @@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest { private static final String RAW_DINING_TABLE_NAME = "dining"; private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME); private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee"); - private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(); + + private final Configuration _properties = new PropertiesConfiguration(); private ZkClient _zkClient; private HelixBrokerStarter _helixBrokerStarter; @@ -71,11 +72,10 @@ public class HelixBrokerStarterTest extends ControllerTest { startController(); - _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943); - _pinotHelixBrokerProperties - .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L); - _helixBrokerStarter = - new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties); + _properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943); + _properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L); + _helixBrokerStarter = new HelixBrokerStarter(_properties, getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR); + _helixBrokerStarter.start(); ControllerRequestBuilderUtil .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true); @@ -139,8 +139,10 @@ public class HelixBrokerStarterTest extends ControllerTest { throws Exception { IdealState idealState; - Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6); - idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), + 6); + idealState = + _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT); ExternalView externalView = @@ -173,8 +175,10 @@ public class HelixBrokerStarterTest extends ControllerTest { .setBrokerTenant("testBroker").setServerTenant("testServer").build(); _helixResourceManager.addTable(tableConfig); - Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6); - idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), + 6); + idealState = + _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT); Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT); @@ -183,8 +187,9 @@ public class HelixBrokerStarterTest extends ControllerTest { @Override public Boolean call() throws Exception { - return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) - .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT; + return + _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) + .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT; } }, 30000L); @@ -273,8 +278,7 @@ public class HelixBrokerStarterTest extends ControllerTest { TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting(). getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME); return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue()); - }, 5 * _pinotHelixBrokerProperties - .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL)); + }, 5 * _properties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL)); tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting(). getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME); Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue())); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java deleted file mode 100644 index f080f0e..0000000 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker; - -import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; -import org.testng.Assert; -import org.testng.annotations.Test; - - -public class HelixBrokerStarterUtilsTest { - - @Test - public void testZkParserUtil1() { - String zkServers = "hostname1,hostname2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1/helixClusterName/PROPERTYSTORE,hostname2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil2() { - String zkServers = "hostname1,hostname2/chroot1/chroot2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2/chroot1/chroot2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil3() { - String zkServers = "hostname1:2181,hostname2:2181"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1:2181/helixClusterName/PROPERTYSTORE,hostname2:2181/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil4() { - String zkServers = "hostname1:2181,hostname2:2181/chroot1/chroot2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 137288c..d1e74e6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -130,6 +130,10 @@ public class CommonConstants { public static final String CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START = "pinot.broker.startup.minResourcePercent"; public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 100.0; + // [PINOT-2435] setting to 0 so it doesn't disconnect from zk + public static final String CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS = "pinot.broker.helix.flappingTimeWindowMs"; + public static final String DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS = "0"; + public static class Request { public static final String PQL = "pql"; public static final String TRACE = "trace"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index f677be3..cc0b4d9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -44,13 +44,13 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.http.HttpStatus; import org.apache.pinot.broker.broker.BrokerServerBuilder; -import org.apache.pinot.broker.broker.BrokerTestUtils; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.common.config.IndexingConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.TableTaskConfig; import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Minion; import org.apache.pinot.common.utils.CommonConstants.Server; @@ -96,33 +96,38 @@ public abstract class ClusterTest extends ControllerTest { protected TableConfig _offlineTableConfig; protected TableConfig _realtimeTableConfig; - protected void startBroker() { + protected void startBroker() + throws Exception { startBrokers(1); } - protected void startBroker(int basePort, String zkStr) { + protected void startBroker(int basePort, String zkStr) + throws Exception { startBrokers(1, basePort, zkStr); } - protected void startBrokers(int numBrokers) { + protected void startBrokers(int numBrokers) + throws Exception { startBrokers(numBrokers, DEFAULT_BROKER_PORT, ZkStarter.DEFAULT_ZK_STR); } - protected void startBrokers(int numBrokers, int basePort, String zkStr) { + protected void startBrokers(int numBrokers, int basePort, String zkStr) + throws Exception { _brokerBaseApiUrl = "http://localhost:" + basePort; for (int i = 0; i < numBrokers; i++) { - Configuration configuration = BrokerTestUtils.getDefaultBrokerConfiguration(); - configuration.setProperty("pinot.broker.timeoutMs", 100 * 1000L); - configuration.setProperty("pinot.broker.client.queryPort", Integer.toString(basePort + i)); - configuration.setProperty("pinot.broker.routing.table.builder.class", "random"); - configuration.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0); + Configuration properties = new PropertiesConfiguration(); + properties.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 100 * 1000L); + properties.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i)); + properties.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0); // Randomly choose to use connection-pool or single-connection request handler if (RANDOM.nextBoolean()) { - configuration.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG, + properties.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG, BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE); } - overrideBrokerConf(configuration); - _brokerStarters.add(BrokerTestUtils.startBroker(_clusterName, zkStr, configuration)); + overrideBrokerConf(properties); + HelixBrokerStarter brokerStarter = new HelixBrokerStarter(properties, _clusterName, zkStr); + brokerStarter.start(); + _brokerStarters.add(brokerStarter); } } @@ -219,11 +224,7 @@ public abstract class ClusterTest extends ControllerTest { protected void stopBroker() { for (HelixBrokerStarter brokerStarter : _brokerStarters) { - try { - BrokerTestUtils.stopBroker(brokerStarter); - } catch (Exception e) { - LOGGER.error("Encountered exception while stopping broker {}", e.getMessage()); - } + brokerStarter.shutdown(); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java index 5fb52cc..8f0045d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java @@ -43,7 +43,8 @@ public class NewConfigApplyIntegrationTest extends BaseClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(NewConfigApplyIntegrationTest.class); @BeforeClass - public void setUp() { + public void setUp() + throws Exception { // Start an empty cluster startZk(); startController(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java index 60bcf2c..01d0cb5 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java @@ -41,7 +41,6 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm @Option(name = "-brokerPort", required = false, metaVar = "<int>", usage = "Broker port number to use for query.") private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT; - ; @Option(name = "-zkAddress", required = false, metaVar = "<http>", usage = "HTTP address of Zookeeper.") private String _zkAddress = DEFAULT_ZK_ADDRESS; @@ -59,6 +58,8 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm return _help; } + private HelixBrokerStarter _brokerStarter; + @Override public String getName() { return "StartBroker"; @@ -75,7 +76,9 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm @Override public void cleanup() { - + if (_brokerStarter != null) { + _brokerStarter.shutdown(); + } } @Override @@ -107,23 +110,22 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm public boolean execute() throws Exception { try { - Configuration configuration = readConfigFromFile(_configFileName); - if (configuration == null) { + Configuration properties = readConfigFromFile(_configFileName); + if (properties == null) { if (_configFileName != null) { LOGGER.error("Error: Unable to find file {}.", _configFileName); return false; } - configuration = new PropertiesConfiguration(); - configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort); - configuration.setProperty("pinot.broker.routing.table.builder.class", "random"); + properties = new PropertiesConfiguration(); + properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort); } LOGGER.info("Executing command: " + toString()); - final HelixBrokerStarter pinotHelixBrokerStarter = - new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, configuration); + _brokerStarter = new HelixBrokerStarter(properties, _clusterName, _zkAddress, _brokerHost); + _brokerStarter.start(); - String pidFile = ".pinotAdminBroker-" + String.valueOf(System.currentTimeMillis()) + ".pid"; + String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() + ".pid"; savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile); return true; } catch (Exception e) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java index c8a6005..ae28912 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java @@ -214,7 +214,7 @@ public class PerfBenchmarkDriver { brokerConfiguration.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName); brokerConfiguration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS); LOGGER.info("Starting broker instance: {}", brokerInstanceName); - new HelixBrokerStarter(_clusterName, _zkAddress, brokerConfiguration); + new HelixBrokerStarter(brokerConfiguration, _clusterName, _zkAddress).start(); } private void startServer() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
