This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch broker_config in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 2ffd7d532b878c93305041acf0749daf8280a94b Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Thu Apr 11 21:28:54 2019 -0700 Remove redundant default broker configurations - Remove config based routing configs because the config based routing was removed long time ago - Remove HelixBrokerStarter.getZkAddressForBroker() because it does not apply to the current implementation - Replace config key strings with constants - Change timeout for default broker and broker in integration test to 60s - Replace PropertiesConfiguration with light weight BaseConfiguration if not read from config file --- .../broker/helix/DefaultHelixBrokerConfig.java | 54 --------------- .../broker/broker/helix/HelixBrokerStarter.java | 80 ++++++---------------- .../pinot/broker/broker/BrokerTestUtils.java | 51 -------------- .../broker/broker/HelixBrokerStarterTest.java | 33 +++++---- .../broker/broker/HelixBrokerStarterUtilsTest.java | 63 ----------------- .../pinot/integration/tests/ClusterTest.java | 38 +++++----- .../tests/NewConfigApplyIntegrationTest.java | 3 +- .../tools/admin/command/StartBrokerCommand.java | 16 ++--- .../pinot/tools/perf/PerfBenchmarkDriver.java | 9 +-- 9 files changed, 72 insertions(+), 275 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java deleted file mode 100644 index e7ab072..0000000 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker.helix; - -import java.util.Iterator; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; - - -public class DefaultHelixBrokerConfig { - - public static Configuration getDefaultBrokerConf() { - Configuration brokerConf = new PropertiesConfiguration(); - - // config based routing - brokerConf.addProperty("pinot.broker.transport.routingMode", "HELIX"); - - brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.class", "balanced"); - brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.numOfRoutingTables", "10"); - brokerConf.addProperty("pinot.broker.routing.table.builder.tables", ""); - - //client properties - brokerConf.addProperty("pinot.broker.client.queryPort", "8099"); - - return brokerConf; - } - - public static Configuration getDefaultBrokerConf(Configuration externalConfigs) { - final Configuration defaultConfigs = getDefaultBrokerConf(); - @SuppressWarnings("unchecked") - Iterator<String> iterable = externalConfigs.getKeys(); - while (iterable.hasNext()) { - String key = iterable.next(); - defaultConfigs.setProperty(key, externalConfigs.getProperty(key)); - } - return defaultConfigs; - } -} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index 081d891..c54f27a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -25,9 +25,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.HelixDataAccessor; @@ -51,7 +50,6 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.common.utils.ServiceStatus; -import org.apache.pinot.common.utils.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,14 +60,15 @@ import org.slf4j.LoggerFactory; * */ public class HelixBrokerStarter { - private static final String PROPERTY_STORE = "PROPERTYSTORE"; + private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class); + private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table"; // Spectator Helix manager handles the custom change listeners, properties read/write private final HelixManager _spectatorHelixManager; // Participant Helix manager handles Helix functionality such as state transitions and messages private final HelixManager _participantHelixManager; - private final Configuration _pinotHelixProperties; + private final Configuration _brokerConf; private final HelixAdmin _helixAdmin; private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final HelixDataAccessor _helixDataAccessor; @@ -84,31 +83,26 @@ public class HelixBrokerStarter { // Set after broker is started, which is actually in the constructor. private AccessControlFactory _accessControlFactory; - private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class); - - private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table"; - - public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration pinotHelixProperties) + public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration brokerConf) throws Exception { - this(null, helixClusterName, zkServer, pinotHelixProperties); + this(null, helixClusterName, zkServer, brokerConf); } - public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer, - Configuration pinotHelixProperties) + public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer, Configuration brokerConf) throws Exception { LOGGER.info("Starting Pinot broker"); - _pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties); + _brokerConf = brokerConf; if (brokerHost == null) { brokerHost = NetUtil.getHostAddress(); } - final String brokerId = _pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, - CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _pinotHelixProperties + String brokerId = _brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, + CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _brokerConf .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT)); - _pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId); + _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId); setupHelixSystemProperties(); // Remove all white-spaces from the list of zkServers (if any). @@ -123,10 +117,10 @@ public class HelixBrokerStarter { _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager, - pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY)); + brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY)); _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager); _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager); - _brokerServerBuilder = startBroker(_pinotHelixProperties); + _brokerServerBuilder = startBroker(_brokerConf); _metricsRegistry = _brokerServerBuilder.getMetricsRegistry(); // Initialize cluster change mediator @@ -160,8 +154,8 @@ public class HelixBrokerStarter { stateMachineEngine .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory); _participantHelixManager.connect(); - _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, - _pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, + _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, _brokerConf + .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS)); _participantHelixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), _tbiMessageHandler); @@ -169,7 +163,7 @@ public class HelixBrokerStarter { addInstanceTagIfNeeded(helixClusterName, brokerId); // Register the service status handler - double minResourcePercentForStartup = _pinotHelixProperties + double minResourcePercentForStartup = _brokerConf .getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList @@ -189,7 +183,7 @@ public class HelixBrokerStarter { // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the // non-positive value, so set the default value as 1. - System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _pinotHelixProperties + System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf .getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS, CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); } @@ -209,9 +203,6 @@ public class HelixBrokerStarter { } private BrokerServerBuilder startBroker(Configuration config) { - if (config == null) { - config = DefaultHelixBrokerConfig.getDefaultBrokerConf(); - } BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting, _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager); _accessControlFactory = brokerServerBuilder.getAccessControlFactory(); @@ -278,32 +269,6 @@ public class HelixBrokerStarter { return _accessControlFactory; } - /** - * The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which applies - * the /helixClusterName/PROPERTY_STORE after chroot to all servers. - * Expected output for this method is: - * 127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE - * - * @param zkServers - * @param helixClusterName - * @return the full property store path - * - * @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int, org.apache.zookeeper.Watcher) - */ - public static String getZkAddressForBroker(String zkServers, String helixClusterName) { - List tokens = new ArrayList<String>(); - String[] zkSplit = zkServers.split("/", 2); - String zkHosts = zkSplit[0]; - String zkPathSuffix = StringUtil.join("/", helixClusterName, PROPERTY_STORE); - if (zkSplit.length > 1) { - zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix; - } - for (String token : zkHosts.split(",")) { - tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"), zkPathSuffix)); - } - return StringUtils.join(tokens, ","); - } - public HelixManager getSpectatorHelixManager() { return _spectatorHelixManager; } @@ -318,14 +283,11 @@ public class HelixBrokerStarter { public static HelixBrokerStarter startDefault() throws Exception { - Configuration configuration = new PropertiesConfiguration(); + Configuration brokerConf = new BaseConfiguration(); int port = 5001; - configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port); - configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L); - - final HelixBrokerStarter pinotHelixBrokerStarter = - new HelixBrokerStarter(null, "quickstart", "localhost:2122", configuration); - return pinotHelixBrokerStarter; + brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port); + brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L); + return new HelixBrokerStarter(null, "quickstart", "localhost:2122", brokerConf); } public void shutdown() { diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java deleted file mode 100644 index f863e56..0000000 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker; - -import org.apache.commons.configuration.Configuration; -import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig; -import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; - - -/** - * Utilities to start a broker during unit tests. - * - */ -public class BrokerTestUtils { - public static Configuration getDefaultBrokerConfiguration() { - return DefaultHelixBrokerConfig.getDefaultBrokerConf(); - } - - public static HelixBrokerStarter startBroker(final String clusterName, final String zkStr, - final Configuration configuration) { - try { - return new HelixBrokerStarter(clusterName, zkStr, configuration); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void stopBroker(final HelixBrokerStarter brokerStarter) { - try { - brokerStarter.getBrokerServerBuilder().stop(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 08387a8..b3a0452 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -29,10 +29,10 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; import org.apache.pinot.broker.routing.TimeBoundaryService; @@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest { private static final String RAW_DINING_TABLE_NAME = "dining"; private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME); private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee"); - private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(); + + private final Configuration _brokerConf = new BaseConfiguration(); private ZkClient _zkClient; private HelixBrokerStarter _helixBrokerStarter; @@ -71,11 +72,9 @@ public class HelixBrokerStarterTest extends ControllerTest { startController(); - _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943); - _pinotHelixBrokerProperties - .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L); - _helixBrokerStarter = - new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties); + _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943); + _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L); + _helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _brokerConf); ControllerRequestBuilderUtil .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true); @@ -139,8 +138,10 @@ public class HelixBrokerStarterTest extends ControllerTest { throws Exception { IdealState idealState; - Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6); - idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), + 6); + idealState = + _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT); ExternalView externalView = @@ -173,8 +174,10 @@ public class HelixBrokerStarterTest extends ControllerTest { .setBrokerTenant("testBroker").setServerTenant("testServer").build(); _helixResourceManager.addTable(tableConfig); - Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6); - idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), + 6); + idealState = + _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT); Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT); @@ -183,8 +186,9 @@ public class HelixBrokerStarterTest extends ControllerTest { @Override public Boolean call() throws Exception { - return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) - .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT; + return + _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE) + .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT; } }, 30000L); @@ -273,8 +277,7 @@ public class HelixBrokerStarterTest extends ControllerTest { TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting(). getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME); return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue()); - }, 5 * _pinotHelixBrokerProperties - .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL)); + }, 5 * _brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL)); tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting(). getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME); Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue())); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java deleted file mode 100644 index f080f0e..0000000 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker; - -import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; -import org.testng.Assert; -import org.testng.annotations.Test; - - -public class HelixBrokerStarterUtilsTest { - - @Test - public void testZkParserUtil1() { - String zkServers = "hostname1,hostname2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1/helixClusterName/PROPERTYSTORE,hostname2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil2() { - String zkServers = "hostname1,hostname2/chroot1/chroot2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2/chroot1/chroot2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil3() { - String zkServers = "hostname1:2181,hostname2:2181"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1:2181/helixClusterName/PROPERTYSTORE,hostname2:2181/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } - - @Test - public void testZkParserUtil4() { - String zkServers = "hostname1:2181,hostname2:2181/chroot1/chroot2"; - String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName"); - String expectedZkAddressForBroker = - "hostname1:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE"; - Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker); - } -} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index f677be3..5d67305 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -39,18 +39,19 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.http.HttpStatus; import org.apache.pinot.broker.broker.BrokerServerBuilder; -import org.apache.pinot.broker.broker.BrokerTestUtils; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.common.config.IndexingConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.TableTaskConfig; import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.CommonConstants.Broker; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Minion; import org.apache.pinot.common.utils.CommonConstants.Server; @@ -96,33 +97,36 @@ public abstract class ClusterTest extends ControllerTest { protected TableConfig _offlineTableConfig; protected TableConfig _realtimeTableConfig; - protected void startBroker() { + protected void startBroker() + throws Exception { startBrokers(1); } - protected void startBroker(int basePort, String zkStr) { + protected void startBroker(int basePort, String zkStr) + throws Exception { startBrokers(1, basePort, zkStr); } - protected void startBrokers(int numBrokers) { + protected void startBrokers(int numBrokers) + throws Exception { startBrokers(numBrokers, DEFAULT_BROKER_PORT, ZkStarter.DEFAULT_ZK_STR); } - protected void startBrokers(int numBrokers, int basePort, String zkStr) { + protected void startBrokers(int numBrokers, int basePort, String zkStr) + throws Exception { _brokerBaseApiUrl = "http://localhost:" + basePort; for (int i = 0; i < numBrokers; i++) { - Configuration configuration = BrokerTestUtils.getDefaultBrokerConfiguration(); - configuration.setProperty("pinot.broker.timeoutMs", 100 * 1000L); - configuration.setProperty("pinot.broker.client.queryPort", Integer.toString(basePort + i)); - configuration.setProperty("pinot.broker.routing.table.builder.class", "random"); - configuration.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0); + Configuration brokerConf = new BaseConfiguration(); + brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L); + brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i)); + brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0); // Randomly choose to use connection-pool or single-connection request handler if (RANDOM.nextBoolean()) { - configuration.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG, + brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG, BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE); } - overrideBrokerConf(configuration); - _brokerStarters.add(BrokerTestUtils.startBroker(_clusterName, zkStr, configuration)); + overrideBrokerConf(brokerConf); + _brokerStarters.add(new HelixBrokerStarter(_clusterName, zkStr, brokerConf)); } } @@ -213,17 +217,13 @@ public abstract class ClusterTest extends ControllerTest { // Do nothing, to be overridden by tests if they need something specific } - protected void overrideBrokerConf(Configuration configuration) { + protected void overrideBrokerConf(Configuration brokerConf) { // Do nothing, to be overridden by tests if they need something specific } protected void stopBroker() { for (HelixBrokerStarter brokerStarter : _brokerStarters) { - try { - BrokerTestUtils.stopBroker(brokerStarter); - } catch (Exception e) { - LOGGER.error("Encountered exception while stopping broker {}", e.getMessage()); - } + brokerStarter.shutdown(); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java index 5fb52cc..8f0045d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java @@ -43,7 +43,8 @@ public class NewConfigApplyIntegrationTest extends BaseClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(NewConfigApplyIntegrationTest.class); @BeforeClass - public void setUp() { + public void setUp() + throws Exception { // Start an empty cluster startZk(); startController(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java index 60bcf2c..52b046f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java @@ -19,8 +19,8 @@ package org.apache.pinot.tools.admin.command; import java.io.File; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.tools.Command; @@ -107,23 +107,21 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm public boolean execute() throws Exception { try { - Configuration configuration = readConfigFromFile(_configFileName); - if (configuration == null) { + Configuration brokerConf = readConfigFromFile(_configFileName); + if (brokerConf == null) { if (_configFileName != null) { LOGGER.error("Error: Unable to find file {}.", _configFileName); return false; } - configuration = new PropertiesConfiguration(); - configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort); - configuration.setProperty("pinot.broker.routing.table.builder.class", "random"); + brokerConf = new BaseConfiguration(); + brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort); } LOGGER.info("Executing command: " + toString()); - final HelixBrokerStarter pinotHelixBrokerStarter = - new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, configuration); + new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, brokerConf); - String pidFile = ".pinotAdminBroker-" + String.valueOf(System.currentTimeMillis()) + ".pid"; + String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() + ".pid"; savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile); return true; } catch (Exception e) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java index c8a6005..f9ac81c 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import javax.annotation.Nullable; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -209,12 +210,12 @@ public class PerfBenchmarkDriver { LOGGER.info("Skipping start broker step. Assumes broker is already started."); return; } - Configuration brokerConfiguration = new PropertiesConfiguration(); + Configuration brokerConf = new BaseConfiguration(); String brokerInstanceName = "Broker_localhost_" + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT; - brokerConfiguration.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName); - brokerConfiguration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS); + brokerConf.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName); + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS); LOGGER.info("Starting broker instance: {}", brokerInstanceName); - new HelixBrokerStarter(_clusterName, _zkAddress, brokerConfiguration); + new HelixBrokerStarter(_clusterName, _zkAddress, brokerConf); } private void startServer() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
