jackjlli commented on a change in pull request #4100: Refactor
HelixBrokerStarter to separate constructor and start()
URL: https://github.com/apache/incubator-pinot/pull/4100#discussion_r274088756
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
##########
@@ -47,189 +48,196 @@
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
-import org.apache.pinot.common.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Helix Broker Startable
- *
- *
- */
public class HelixBrokerStarter {
- private static final String PROPERTY_STORE = "PROPERTYSTORE";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HelixBrokerStarter.class);
+ private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY =
"pinot.broker.routing.table";
+
+ private final Configuration _properties;
+ private final String _clusterName;
+ private final String _zkServers;
+ private final String _brokerId;
// Spectator Helix manager handles the custom change listeners, properties
read/write
- private final HelixManager _spectatorHelixManager;
+ private HelixManager _spectatorHelixManager;
// Participant Helix manager handles Helix functionality such as state
transitions and messages
- private final HelixManager _participantHelixManager;
-
- private final Configuration _pinotHelixProperties;
- private final HelixAdmin _helixAdmin;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final HelixDataAccessor _helixDataAccessor;
- private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private final BrokerServerBuilder _brokerServerBuilder;
- private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
- private final MetricsRegistry _metricsRegistry;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
- private final ClusterChangeMediator _clusterChangeMediator;
- private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
-
- // Set after broker is started, which is actually in the constructor.
- private AccessControlFactory _accessControlFactory;
+ private HelixManager _participantHelixManager;
- private static final Logger LOGGER =
LoggerFactory.getLogger(HelixBrokerStarter.class);
-
- private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY =
"pinot.broker.routing.table";
+ private HelixAdmin _helixAdmin;
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private HelixDataAccessor _helixDataAccessor;
+ private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+ private BrokerServerBuilder _brokerServerBuilder;
+ private AccessControlFactory _accessControlFactory;
+ private MetricsRegistry _metricsRegistry;
+ private ClusterChangeMediator _clusterChangeMediator;
+ private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
- public HelixBrokerStarter(String helixClusterName, String zkServer,
Configuration pinotHelixProperties)
+ public HelixBrokerStarter(Configuration properties, String clusterName,
String zkServer)
throws Exception {
- this(null, helixClusterName, zkServer, pinotHelixProperties);
+ this(properties, clusterName, zkServer, null);
}
- public HelixBrokerStarter(String brokerHost, String helixClusterName, String
zkServer,
- Configuration pinotHelixProperties)
+ public HelixBrokerStarter(Configuration properties, String clusterName,
String zkServer, @Nullable String brokerHost)
throws Exception {
- LOGGER.info("Starting Pinot broker");
+ _properties = properties;
+ setupHelixSystemProperties();
- _pinotHelixProperties =
DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
+ _clusterName = clusterName;
+
+ // Remove all white-spaces from the list of zkServers (if any).
+ _zkServers = zkServer.replaceAll("\\s+", "");
if (brokerHost == null) {
brokerHost = NetUtil.getHostAddress();
}
-
- final String brokerId =
_pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
- CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" +
_pinotHelixProperties
+ _brokerId =
_properties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+ CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" +
_properties
.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+ _properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
_brokerId);
+ }
-
_pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID,
brokerId);
- setupHelixSystemProperties();
+ private void setupHelixSystemProperties() {
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _properties
+
.getString(CommonConstants.Broker.CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS,
+ CommonConstants.Broker.DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS));
+ }
- // Remove all white-spaces from the list of zkServers (if any).
- String zkServers = zkServer.replaceAll("\\s+", "");
+ public void start()
+ throws Exception {
+ LOGGER.info("Starting Pinot broker");
- LOGGER.info("Connecting Helix components");
- // Connect spectator Helix manager.
+ // Connect the spectator Helix manager
+ LOGGER.info("Connecting spectator Helix manager");
_spectatorHelixManager =
- HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId,
InstanceType.SPECTATOR, zkServers);
+ HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId,
InstanceType.SPECTATOR, _zkServers);
_spectatorHelixManager.connect();
_helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
_helixExternalViewBasedRouting = new
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
- pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
- _tableQueryQuotaManager = new
TableQueryQuotaManager(_spectatorHelixManager);
- _liveInstanceChangeHandler = new
LiveInstanceChangeHandler(_spectatorHelixManager);
- _brokerServerBuilder = startBroker(_pinotHelixProperties);
+ _properties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+ TableQueryQuotaManager tableQueryQuotaManager = new
TableQueryQuotaManager(_spectatorHelixManager);
+ LiveInstanceChangeHandler liveInstanceChangeHandler = new
LiveInstanceChangeHandler(_spectatorHelixManager);
+
+ // Set up the broker server builder
+ LOGGER.info("Setting up broker server builder");
+ _brokerServerBuilder = new BrokerServerBuilder(_properties,
_helixExternalViewBasedRouting,
+ _helixExternalViewBasedRouting.getTimeBoundaryService(),
liveInstanceChangeHandler, tableQueryQuotaManager);
+ _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
_metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+ BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
+ _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
+ tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
Review comment:
Can you elaborate why tableQueryQuotaManager is converted to local variable
whereas _helixExternalViewBasedRouting remains the same way?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]