This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0648f77 In ServiceStartable, apply global cluster config in ZK to
instance config (#7593)
0648f77 is described below
commit 0648f77c6d0bff6d468110fe9166cec8f280773f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 20 11:41:41 2021 -0700
In ServiceStartable, apply global cluster config in ZK to instance config
(#7593)
When starting a component, read the global cluster config from the ZK and
apply it to the instance config so that the common configs can be put in the
cluster config and simplify the per instance configuration.
---
.../broker/broker/helix/BaseBrokerStarter.java | 62 ++++-----------
.../broker/broker/HelixBrokerStarterTest.java | 30 +++++---
.../pinot/common/utils/ServiceStartableUtils.java | 90 ++++++++++++++++++++++
.../pinot/controller/BaseControllerStarter.java | 8 +-
.../org/apache/pinot/minion/BaseMinionStarter.java | 7 +-
.../server/starter/helix/BaseServerStarter.java | 8 +-
.../apache/pinot/spi/env/PinotConfiguration.java | 4 -
7 files changed, 142 insertions(+), 67 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 1e87bdf..238056a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -34,11 +33,9 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.ZNRecord;
-import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
@@ -52,6 +49,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
+import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -108,13 +106,13 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
public void init(PinotConfiguration brokerConf)
throws Exception {
_brokerConf = brokerConf;
- _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
- setupHelixSystemProperties();
-
- _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
-
// Remove all white-spaces from the list of zkServers (if any).
_zkServers =
brokerConf.getProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", "");
+ _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
+ ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers,
_clusterName, ServiceRole.BROKER);
+
+ setupHelixSystemProperties();
+ _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
_hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME);
if (_hostname == null) {
_hostname =
@@ -199,35 +197,6 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
- // Fetch cluster level config from ZK
- HelixConfigScope helixConfigScope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
- Map<String, String> configMap = _helixAdmin.getConfig(helixConfigScope,
Arrays
- .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
- Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE,
Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
- Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY));
-
- boolean caseInsensitive =
Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) ||
Boolean
-
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
-
- String log2mStr = configMap.get(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY);
- if (log2mStr != null) {
- try {
- _brokerConf.setProperty(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
Integer.parseInt(log2mStr));
- } catch (NumberFormatException e) {
- LOGGER.warn("Invalid config of '{}': '{}', using: {} as the default
log2m for HyperLogLog",
- Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, log2mStr,
Helix.DEFAULT_HYPERLOGLOG_LOG2M);
- }
- }
-
- if
(Boolean.parseBoolean(configMap.get(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE)))
{
- _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE,
true);
- }
-
- if
(Boolean.parseBoolean(configMap.get(Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY)))
{
- _brokerConf.setProperty(Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY,
true);
- }
-
LOGGER.info("Setting up broker request handler");
// Set up metric registry and broker metrics
PinotConfiguration metricsConfiguration =
_brokerConf.subset(Broker.METRICS_CONFIG_PREFIX);
@@ -251,6 +220,9 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
// Initialize FunctionRegistry before starting the broker request handler
FunctionRegistry.init();
+ boolean caseInsensitive =
+ _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, false) ||
_brokerConf.getProperty(
+ Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY, false);
TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf,
Broker.BROKER_TLS_PREFIX);
@@ -312,10 +284,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
new BrokerUserDefinedMessageHandlerFactory(_routingManager,
queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigIfNeeded();
- _brokerMetrics
- .addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () ->
_participantHelixManager.isConnected() ? 1L : 0L);
- _participantHelixManager
- .addPreConnectCallback(() ->
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
+ _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
+ () -> _participantHelixManager.isConnected() ? 1L : 0L);
+ _participantHelixManager.addPreConnectCallback(
+ () ->
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
1L));
// Register the service status handler
registerServiceStatusHandler();
@@ -355,8 +327,8 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
}
- double minResourcePercentForStartup = _brokerConf
- .getProperty(Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
+ double minResourcePercentForStartup =
+
_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
LOGGER.info("Registering service status handler");
@@ -381,8 +353,8 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
// been disconnected, so instance should disappear from ExternalView soon
and stop getting new queries.
long delayShutdownTimeMs =
_brokerConf.getProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS,
Broker.DEFAULT_DELAY_SHUTDOWN_TIME_MS);
- LOGGER
- .info("Wait for {}ms before shutting down request handler to finish
the pending queries", delayShutdownTimeMs);
+ LOGGER.info("Wait for {}ms before shutting down request handler to finish
the pending queries",
+ delayShutdownTimeMs);
try {
Thread.sleep(delayShutdownTimeMs);
} catch (Exception e) {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 0366e09..b42fdf2 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -80,6 +80,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 18099);
properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
+ properties.put(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, true);
properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
_brokerStarter = new HelixBrokerStarter();
@@ -90,9 +91,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
- .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT,
- new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(),
DateTimeFieldSpec.TimeFormat.EPOCH.toString())
- .getFormat(), new DateTimeGranularitySpec(1,
TimeUnit.DAYS).getGranularity()).build();
+ .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT, new
DateTimeFormatSpec(1, TimeUnit.DAYS.toString(),
+ DateTimeFieldSpec.TimeFormat.EPOCH.toString()).getFormat(),
+ new DateTimeGranularitySpec(1,
TimeUnit.DAYS).getGranularity()).build();
_helixResourceManager.addSchema(schema, true);
TableConfig offlineTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
@@ -104,9 +105,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
_helixResourceManager.addTable(realtimeTimeConfig);
for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) {
- _helixResourceManager
- .addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
- "downloadUrl");
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
"downloadUrl");
}
TestUtils.waitForCondition(aVoid -> {
@@ -122,12 +122,22 @@ public class HelixBrokerStarterTest extends
ControllerTest {
streamConfigs.put("streamType", "kafka");
streamConfigs.put("stream.kafka.consumer.type", "highLevel");
streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
- streamConfigs
- .put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
+ streamConfigs.put("stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
return streamConfigs;
}
@Test
+ public void testClusterConfigOverride() {
+ PinotConfiguration config = _brokerStarter.getConfig();
+ assertTrue(config.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, false));
+ assertEquals(config.getProperty(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, 0),
12);
+
+ // NOTE: It is disabled in cluster config, but enabled in instance config.
Instance config should take precedence.
+
assertTrue(config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE,
false));
+ }
+
+ @Test
public void testResourceAndTagAssignment()
throws Exception {
assertEquals(
@@ -156,8 +166,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
assertTrue(routingTable.getUnavailableSegments().isEmpty());
// Add a new segment into the OFFLINE table
- _helixResourceManager
- .addNewSegment(OFFLINE_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME),
"downloadUrl");
TestUtils.waitForCondition(aVoid ->
routingManager.getRoutingTable(brokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size()
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
new file mode 100644
index 0000000..dbc8448
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.services.ServiceRole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ServiceStartableUtils {
+ private ServiceStartableUtils() {
+ }
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServiceStartableUtils.class);
+ private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE =
"/%s/CONFIGS/CLUSTER/%s";
+ private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
+ private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE =
"pinot.%s.";
+ private static final int ZK_TIMEOUT_MS = 30_000;
+
+ /**
+ * Applies the ZK cluster config to the given instance config if it does not
already exist.
+ *
+ * In the ZK cluster config:
+ * - pinot.all.* will be replaced to role specific config, e.g.
pinot.controller.* for controllers
+ */
+ public static void applyClusterConfig(PinotConfiguration instanceConfig,
String zkAddress, String clusterName,
+ ServiceRole serviceRole) {
+ HelixZkClient.ZkClientConfig zkClientConfig = new
HelixZkClient.ZkClientConfig();
+ zkClientConfig.setZkSerializer(new
ZNRecordSerializer()).setConnectInitTimeout(ZK_TIMEOUT_MS);
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
zkClientConfig);
+ zkClient.waitUntilConnected(ZK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ try {
+ ZNRecord clusterConfigZNRecord =
+ zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE,
clusterName, clusterName), true);
+ if (clusterConfigZNRecord == null) {
+ LOGGER.warn("Failed to find cluster config for cluster: {}, skipping
applying cluster config", clusterName);
+ return;
+ }
+
+ Map<String, String> clusterConfigs =
clusterConfigZNRecord.getSimpleFields();
+ String instanceConfigKeyPrefix =
+ String.format(PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE,
serviceRole.name().toLowerCase());
+ for (Map.Entry<String, String> entry : clusterConfigs.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(PINOT_ALL_CONFIG_KEY_PREFIX)) {
+ String instanceConfigKey = instanceConfigKeyPrefix +
key.substring(PINOT_ALL_CONFIG_KEY_PREFIX.length());
+ addConfigIfNotExists(instanceConfig, instanceConfigKey, value);
+ } else {
+ // TODO: Currently it puts all keys to the instance config. Consider
standardizing instance config keys and
+ // only put keys with the instance config key prefix.
+ addConfigIfNotExists(instanceConfig, key, value);
+ }
+ }
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ private static void addConfigIfNotExists(PinotConfiguration instanceConfig,
String key, String value) {
+ if (!instanceConfig.containsKey(key)) {
+ instanceConfig.setProperty(key, value);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index a75027a..24e8f1c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -56,6 +56,7 @@ import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ValidationMetrics;
+import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -156,12 +157,13 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
public void init(PinotConfiguration pinotConfiguration)
throws Exception {
_config = new ControllerConf(pinotConfiguration.toMap());
+ _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
+ _helixClusterName = _config.getHelixClusterName();
+ ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL,
_helixClusterName, ServiceRole.CONTROLLER);
+
setupHelixSystemProperties();
_listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config);
_controllerMode = _config.getControllerMode();
- // Helix related settings.
- _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
- _helixClusterName = _config.getHelixClusterName();
inferHostnameIfNeeded(_config);
_hostname = _config.getControllerHost();
_port = _listenerConfigs.get(0).getPort();
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index 99f9acc..aa55d1e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.utils.ClientSSLContextGenerator;
+import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -84,8 +85,11 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
public void init(PinotConfiguration config)
throws Exception {
_config = new MinionConf(config.toMap());
- String helixClusterName = _config.getHelixClusterName();
String zkAddress = _config.getZkAddress();
+ String helixClusterName = _config.getHelixClusterName();
+ ServiceStartableUtils.applyClusterConfig(_config, zkAddress,
helixClusterName, ServiceRole.MINION);
+
+ setupHelixSystemProperties();
_hostname = _config.getHostName();
_port = _config.getPort();
_instanceId = _config.getInstanceId();
@@ -97,7 +101,6 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
_instanceId = CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE +
_hostname + "_" + _port;
}
_listenerConfigs = ListenerConfigUtil.buildMinionAdminConfigs(_config);
- setupHelixSystemProperties();
_helixManager = new ZKHelixManager(helixClusterName, _instanceId,
InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new
MinionTaskZkMetadataManager(_helixManager);
_taskExecutorFactoryRegistry = new
TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 4e8ab07..92c315b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -51,6 +51,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
+import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -135,10 +136,12 @@ public abstract class BaseServerStarter implements
ServiceStartable {
throws Exception {
// Make a clone so that changes to the config won't propagate to the caller
_serverConf = serverConf.clone();
- _listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
- _helixClusterName =
_serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
_zkAddress =
_serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
+ _helixClusterName =
_serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
+ ServiceStartableUtils.applyClusterConfig(_serverConf, _zkAddress,
_helixClusterName, ServiceRole.SERVER);
+ setupHelixSystemProperties();
+ _listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
_hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
_serverConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false)
? NetUtils.getHostnameOrAddress()
: NetUtils.getHostAddress());
@@ -368,7 +371,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName:
{}, instanceId: {}", _zkAddress,
_helixClusterName, _instanceId);
- setupHelixSystemProperties();
_helixManager =
HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _zkAddress);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
index 4eab1de..4a2bafd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
@@ -407,10 +407,6 @@ public class PinotConfiguration {
*
* @param name of the property to overwrite in memory. Applies relaxed
binding on the property name.
* @param value to overwrite in memory
- *
- * @deprecated Configurations should be immutable. Prefer creating a new
{@link #PinotConfiguration} with base
- * properties to overwrite
- * properties.
*/
public void setProperty(String name, Object value) {
_configuration.setProperty(relaxPropertyName(name), value);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]