This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 79b3368 Remove redundant default broker configurations (#4106)
79b3368 is described below
commit 79b33687a7849e01800d5acdd6f2eefecee82998
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Apr 16 17:48:04 2019 -0700
Remove redundant default broker configurations (#4106)
- Remove config based routing configs because the config based
routing was removed long time ago
- Remove HelixBrokerStarter.getZkAddressForBroker() because it
does not apply to the current implementation
- Replace config key strings with constants
- Change timeout for default broker and broker in integration
test to 60s
- Replace PropertiesConfiguration with light weight
BaseConfiguration if not read from config file
---
.../broker/helix/DefaultHelixBrokerConfig.java | 54 --------------
.../broker/broker/helix/HelixBrokerStarter.java | 82 ++++++----------------
.../pinot/broker/broker/BrokerTestUtils.java | 51 --------------
.../broker/broker/HelixBrokerStarterTest.java | 33 +++++----
.../broker/broker/HelixBrokerStarterUtilsTest.java | 63 -----------------
.../pinot/integration/tests/ClusterTest.java | 35 +++++----
.../tests/NewConfigApplyIntegrationTest.java | 3 +-
.../tools/admin/command/StartBrokerCommand.java | 16 ++---
.../pinot/tools/perf/PerfBenchmarkDriver.java | 9 +--
9 files changed, 75 insertions(+), 271 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
deleted file mode 100644
index e7ab072..0000000
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker.helix;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-
-public class DefaultHelixBrokerConfig {
-
- public static Configuration getDefaultBrokerConf() {
- Configuration brokerConf = new PropertiesConfiguration();
-
- // config based routing
- brokerConf.addProperty("pinot.broker.transport.routingMode", "HELIX");
-
-
brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.class",
"balanced");
-
brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.numOfRoutingTables",
"10");
- brokerConf.addProperty("pinot.broker.routing.table.builder.tables", "");
-
- //client properties
- brokerConf.addProperty("pinot.broker.client.queryPort", "8099");
-
- return brokerConf;
- }
-
- public static Configuration getDefaultBrokerConf(Configuration
externalConfigs) {
- final Configuration defaultConfigs = getDefaultBrokerConf();
- @SuppressWarnings("unchecked")
- Iterator<String> iterable = externalConfigs.getKeys();
- while (iterable.hasNext()) {
- String key = iterable.next();
- defaultConfigs.setProperty(key, externalConfigs.getProperty(key));
- }
- return defaultConfigs;
- }
-}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 081d891..7bc9c8d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -25,9 +25,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
@@ -51,7 +51,6 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
-import org.apache.pinot.common.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,14 +61,15 @@ import org.slf4j.LoggerFactory;
*
*/
public class HelixBrokerStarter {
- private static final String PROPERTY_STORE = "PROPERTYSTORE";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HelixBrokerStarter.class);
+ private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY =
"pinot.broker.routing.table";
// Spectator Helix manager handles the custom change listeners, properties
read/write
private final HelixManager _spectatorHelixManager;
// Participant Helix manager handles Helix functionality such as state
transitions and messages
private final HelixManager _participantHelixManager;
- private final Configuration _pinotHelixProperties;
+ private final Configuration _brokerConf;
private final HelixAdmin _helixAdmin;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final HelixDataAccessor _helixDataAccessor;
@@ -84,31 +84,27 @@ public class HelixBrokerStarter {
// Set after broker is started, which is actually in the constructor.
private AccessControlFactory _accessControlFactory;
- private static final Logger LOGGER =
LoggerFactory.getLogger(HelixBrokerStarter.class);
-
- private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY =
"pinot.broker.routing.table";
-
- public HelixBrokerStarter(String helixClusterName, String zkServer,
Configuration pinotHelixProperties)
+ public HelixBrokerStarter(String helixClusterName, String zkServer,
Configuration brokerConf)
throws Exception {
- this(null, helixClusterName, zkServer, pinotHelixProperties);
+ this(null, helixClusterName, zkServer, brokerConf);
}
- public HelixBrokerStarter(String brokerHost, String helixClusterName, String
zkServer,
- Configuration pinotHelixProperties)
+ public HelixBrokerStarter(@Nullable String brokerHost, String
helixClusterName, String zkServer,
+ Configuration brokerConf)
throws Exception {
LOGGER.info("Starting Pinot broker");
- _pinotHelixProperties =
DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
+ _brokerConf = brokerConf;
if (brokerHost == null) {
brokerHost = NetUtil.getHostAddress();
}
- final String brokerId =
_pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
- CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" +
_pinotHelixProperties
+ String brokerId =
_brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+ CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" +
_brokerConf
.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
-
_pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
brokerId);
+ _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
brokerId);
setupHelixSystemProperties();
// Remove all white-spaces from the list of zkServers (if any).
@@ -123,10 +119,10 @@ public class HelixBrokerStarter {
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
_helixExternalViewBasedRouting = new
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
- pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+ brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
_tableQueryQuotaManager = new
TableQueryQuotaManager(_spectatorHelixManager);
_liveInstanceChangeHandler = new
LiveInstanceChangeHandler(_spectatorHelixManager);
- _brokerServerBuilder = startBroker(_pinotHelixProperties);
+ _brokerServerBuilder = startBroker(_brokerConf);
_metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
// Initialize cluster change mediator
@@ -160,8 +156,8 @@ public class HelixBrokerStarter {
stateMachineEngine
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
stateModelFactory);
_participantHelixManager.connect();
- _tbiMessageHandler = new
TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
-
_pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
+ _tbiMessageHandler = new
TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
_brokerConf
+
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
_tbiMessageHandler);
@@ -169,7 +165,7 @@ public class HelixBrokerStarter {
addInstanceTagIfNeeded(helixClusterName, brokerId);
// Register the service status handler
- double minResourcePercentForStartup = _pinotHelixProperties
+ double minResourcePercentForStartup = _brokerConf
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
ServiceStatus.setServiceStatusCallback(new
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
@@ -189,7 +185,7 @@ public class HelixBrokerStarter {
// NOTE: Helix will disconnect the manager and disable the instance if it
detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can
avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
- System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
_pinotHelixProperties
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
@@ -209,9 +205,6 @@ public class HelixBrokerStarter {
}
private BrokerServerBuilder startBroker(Configuration config) {
- if (config == null) {
- config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
- }
BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config,
_helixExternalViewBasedRouting,
_helixExternalViewBasedRouting.getTimeBoundaryService(),
_liveInstanceChangeHandler, _tableQueryQuotaManager);
_accessControlFactory = brokerServerBuilder.getAccessControlFactory();
@@ -278,32 +271,6 @@ public class HelixBrokerStarter {
return _accessControlFactory;
}
- /**
- * The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which
applies
- * the /helixClusterName/PROPERTY_STORE after chroot to all servers.
- * Expected output for this method is:
- *
127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE
- *
- * @param zkServers
- * @param helixClusterName
- * @return the full property store path
- *
- * @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int,
org.apache.zookeeper.Watcher)
- */
- public static String getZkAddressForBroker(String zkServers, String
helixClusterName) {
- List tokens = new ArrayList<String>();
- String[] zkSplit = zkServers.split("/", 2);
- String zkHosts = zkSplit[0];
- String zkPathSuffix = StringUtil.join("/", helixClusterName,
PROPERTY_STORE);
- if (zkSplit.length > 1) {
- zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix;
- }
- for (String token : zkHosts.split(",")) {
- tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"),
zkPathSuffix));
- }
- return StringUtils.join(tokens, ",");
- }
-
public HelixManager getSpectatorHelixManager() {
return _spectatorHelixManager;
}
@@ -318,14 +285,11 @@ public class HelixBrokerStarter {
public static HelixBrokerStarter startDefault()
throws Exception {
- Configuration configuration = new PropertiesConfiguration();
+ Configuration brokerConf = new BaseConfiguration();
int port = 5001;
- configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
port);
- configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L);
-
- final HelixBrokerStarter pinotHelixBrokerStarter =
- new HelixBrokerStarter(null, "quickstart", "localhost:2122",
configuration);
- return pinotHelixBrokerStarter;
+ brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
port);
+ brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
60 * 1000L);
+ return new HelixBrokerStarter(null, "quickstart", "localhost:2122",
brokerConf);
}
public void shutdown() {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
deleted file mode 100644
index f863e56..0000000
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-
-
-/**
- * Utilities to start a broker during unit tests.
- *
- */
-public class BrokerTestUtils {
- public static Configuration getDefaultBrokerConfiguration() {
- return DefaultHelixBrokerConfig.getDefaultBrokerConf();
- }
-
- public static HelixBrokerStarter startBroker(final String clusterName, final
String zkStr,
- final Configuration configuration) {
- try {
- return new HelixBrokerStarter(clusterName, zkStr, configuration);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static void stopBroker(final HelixBrokerStarter brokerStarter) {
- try {
- brokerStarter.getBrokerServerBuilder().stop();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 08387a8..b3a0452 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -29,10 +29,10 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
private static final String RAW_DINING_TABLE_NAME = "dining";
private static final String DINING_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
private static final String COFFEE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType("coffee");
- private final Configuration _pinotHelixBrokerProperties =
DefaultHelixBrokerConfig.getDefaultBrokerConf();
+
+ private final Configuration _brokerConf = new BaseConfiguration();
private ZkClient _zkClient;
private HelixBrokerStarter _helixBrokerStarter;
@@ -71,11 +72,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
startController();
-
_pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
8943);
- _pinotHelixBrokerProperties
-
.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
100L);
- _helixBrokerStarter =
- new HelixBrokerStarter(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
+ _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
8943);
+
_brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
100L);
+ _helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, _brokerConf);
ControllerRequestBuilderUtil
.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 5, true);
@@ -139,8 +138,10 @@ public class HelixBrokerStarterTest extends ControllerTest
{
throws Exception {
IdealState idealState;
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(), 6);
- idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
+ 6);
+ idealState =
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(),
SEGMENT_COUNT);
ExternalView externalView =
@@ -173,8 +174,10 @@ public class HelixBrokerStarterTest extends ControllerTest
{
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
_helixResourceManager.addTable(tableConfig);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(), 6);
- idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
+ 6);
+ idealState =
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(),
SEGMENT_COUNT);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(),
SEGMENT_COUNT);
@@ -183,8 +186,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
@Override
public Boolean call()
throws Exception {
- return _helixAdmin.getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
- .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
+ return
+ _helixAdmin.getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
}
}, 30000L);
@@ -273,8 +277,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo =
_helixBrokerStarter.getHelixExternalViewBasedRouting().
getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
return currentTimeBoundary <
Long.parseLong(timeBoundaryInfo.getTimeValue());
- }, 5 * _pinotHelixBrokerProperties
-
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
+ }, 5 *
_brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
Assert.assertTrue(currentTimeBoundary <
Long.parseLong(tbi.getTimeValue()));
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
deleted file mode 100644
index f080f0e..0000000
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class HelixBrokerStarterUtilsTest {
-
- @Test
- public void testZkParserUtil1() {
- String zkServers = "hostname1,hostname2";
- String zkAddressForBroker =
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
- String expectedZkAddressForBroker =
-
"hostname1/helixClusterName/PROPERTYSTORE,hostname2/helixClusterName/PROPERTYSTORE";
- Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
- }
-
- @Test
- public void testZkParserUtil2() {
- String zkServers = "hostname1,hostname2/chroot1/chroot2";
- String zkAddressForBroker =
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
- String expectedZkAddressForBroker =
-
"hostname1/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
- Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
- }
-
- @Test
- public void testZkParserUtil3() {
- String zkServers = "hostname1:2181,hostname2:2181";
- String zkAddressForBroker =
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
- String expectedZkAddressForBroker =
-
"hostname1:2181/helixClusterName/PROPERTYSTORE,hostname2:2181/helixClusterName/PROPERTYSTORE";
- Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
- }
-
- @Test
- public void testZkParserUtil4() {
- String zkServers = "hostname1:2181,hostname2:2181/chroot1/chroot2";
- String zkAddressForBroker =
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
- String expectedZkAddressForBroker =
-
"hostname1:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
- Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f677be3..5d9ba61 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -39,18 +39,19 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpStatus;
import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.broker.BrokerTestUtils;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TableTaskConfig;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Minion;
import org.apache.pinot.common.utils.CommonConstants.Server;
@@ -96,33 +97,36 @@ public abstract class ClusterTest extends ControllerTest {
protected TableConfig _offlineTableConfig;
protected TableConfig _realtimeTableConfig;
- protected void startBroker() {
+ protected void startBroker()
+ throws Exception {
startBrokers(1);
}
- protected void startBroker(int basePort, String zkStr) {
+ protected void startBroker(int basePort, String zkStr)
+ throws Exception {
startBrokers(1, basePort, zkStr);
}
- protected void startBrokers(int numBrokers) {
+ protected void startBrokers(int numBrokers)
+ throws Exception {
startBrokers(numBrokers, DEFAULT_BROKER_PORT, ZkStarter.DEFAULT_ZK_STR);
}
- protected void startBrokers(int numBrokers, int basePort, String zkStr) {
+ protected void startBrokers(int numBrokers, int basePort, String zkStr)
+ throws Exception {
_brokerBaseApiUrl = "http://localhost:" + basePort;
for (int i = 0; i < numBrokers; i++) {
- Configuration configuration =
BrokerTestUtils.getDefaultBrokerConfiguration();
- configuration.setProperty("pinot.broker.timeoutMs", 100 * 1000L);
- configuration.setProperty("pinot.broker.client.queryPort",
Integer.toString(basePort + i));
- configuration.setProperty("pinot.broker.routing.table.builder.class",
"random");
-
configuration.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+ Configuration brokerConf = new BaseConfiguration();
+ brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+ brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT,
Integer.toString(basePort + i));
+
brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
// Randomly choose to use connection-pool or single-connection request
handler
if (RANDOM.nextBoolean()) {
-
configuration.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
+ brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
}
- overrideBrokerConf(configuration);
- _brokerStarters.add(BrokerTestUtils.startBroker(_clusterName, zkStr,
configuration));
+ overrideBrokerConf(brokerConf);
+ _brokerStarters.add(new HelixBrokerStarter(_clusterName, zkStr,
brokerConf));
}
}
@@ -213,14 +217,15 @@ public abstract class ClusterTest extends ControllerTest {
// Do nothing, to be overridden by tests if they need something specific
}
- protected void overrideBrokerConf(Configuration configuration) {
+ protected void overrideBrokerConf(Configuration brokerConf) {
// Do nothing, to be overridden by tests if they need something specific
}
protected void stopBroker() {
for (HelixBrokerStarter brokerStarter : _brokerStarters) {
try {
- BrokerTestUtils.stopBroker(brokerStarter);
+ // TODO: replace with brokerStarter.shutdown() once they are hooked up
+ brokerStarter.getBrokerServerBuilder().stop();
} catch (Exception e) {
LOGGER.error("Encountered exception while stopping broker {}",
e.getMessage());
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
index 5fb52cc..8f0045d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
@@ -43,7 +43,8 @@ public class NewConfigApplyIntegrationTest extends
BaseClusterIntegrationTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(NewConfigApplyIntegrationTest.class);
@BeforeClass
- public void setUp() {
+ public void setUp()
+ throws Exception {
// Start an empty cluster
startZk();
startController();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index 60bcf2c..52b046f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -19,8 +19,8 @@
package org.apache.pinot.tools.admin.command;
import java.io.File;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.tools.Command;
@@ -107,23 +107,21 @@ public class StartBrokerCommand extends
AbstractBaseAdminCommand implements Comm
public boolean execute()
throws Exception {
try {
- Configuration configuration = readConfigFromFile(_configFileName);
- if (configuration == null) {
+ Configuration brokerConf = readConfigFromFile(_configFileName);
+ if (brokerConf == null) {
if (_configFileName != null) {
LOGGER.error("Error: Unable to find file {}.", _configFileName);
return false;
}
- configuration = new PropertiesConfiguration();
-
configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
_brokerPort);
- configuration.setProperty("pinot.broker.routing.table.builder.class",
"random");
+ brokerConf = new BaseConfiguration();
+ brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
_brokerPort);
}
LOGGER.info("Executing command: " + toString());
- final HelixBrokerStarter pinotHelixBrokerStarter =
- new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress,
configuration);
+ new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress,
brokerConf);
- String pidFile = ".pinotAdminBroker-" +
String.valueOf(System.currentTimeMillis()) + ".pid";
+ String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() +
".pid";
savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
return true;
} catch (Exception e) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index 77e1edc..1df27e5 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -215,12 +216,12 @@ public class PerfBenchmarkDriver {
LOGGER.info("Skipping start broker step. Assumes broker is already
started.");
return;
}
- Configuration brokerConfiguration = new PropertiesConfiguration();
+ Configuration brokerConf = new BaseConfiguration();
String brokerInstanceName = "Broker_localhost_" +
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-
brokerConfiguration.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
brokerInstanceName);
-
brokerConfiguration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
BROKER_TIMEOUT_MS);
+ brokerConf.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
brokerInstanceName);
+ brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
BROKER_TIMEOUT_MS);
LOGGER.info("Starting broker instance: {}", brokerInstanceName);
- new HelixBrokerStarter(_clusterName, _zkAddress, brokerConfiguration);
+ new HelixBrokerStarter(_clusterName, _zkAddress, brokerConf);
}
private void startServer()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]