This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch cluster_change_register in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ebd5226e082cf05da04f8c1e04ddac14d0380885 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Mon Apr 8 15:51:30 2019 -0700 In HelixBrokerStarter, allow custom cluster change handlers Motivation: all cluster change listeners should be added through the ClusterChangeMediater in order to perform dedup and proactive check Add ClusterChangeHandler interface Add methods in HelixBrokerStarter to allow custom cluster change handlers to be plugged in Split the responsibility of helix managers on broker side - Spectator: handles the custom change listeners, properties read/write - Participant: handles Helix functionality such as state transitions and messages --- ...okerResourceOnlineOfflineStateModelFactory.java | 30 ++--- .../broker/broker/helix/ClusterChangeHandler.java | 8 +- .../broker/broker/helix/ClusterChangeMediator.java | 32 +++-- .../broker/helix/ExternalViewChangeHandler.java | 43 ------- .../broker/broker/helix/HelixBrokerStarter.java | 129 ++++++++++++++------- .../broker/helix/InstanceConfigChangeHandler.java | 38 ------ .../broker/helix/LiveInstanceChangeHandler.java | 7 +- .../broker/queryquota/TableQueryQuotaManager.java | 12 +- .../routing/HelixExternalViewBasedRouting.java | 16 ++- 9 files changed, 155 insertions(+), 160 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index 1916d86..b858870 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -18,14 +18,9 @@ */ package org.apache.pinot.broker.broker.helix; -import java.util.List; -import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -37,7 +32,6 @@ import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.utils.helix.HelixHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,19 +47,16 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceOnlineOfflineStateModelFactory.class); - private final HelixManager _helixManager; - private final HelixAdmin _helixAdmin; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; + private final HelixDataAccessor _dataAccessor; private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting; private final TableQueryQuotaManager _tableQueryQuotaManager; - private ZkHelixPropertyStore<ZNRecord> _propertyStore; - - public BrokerResourceOnlineOfflineStateModelFactory(HelixManager helixManager, - ZkHelixPropertyStore<ZNRecord> propertyStore, HelixExternalViewBasedRouting helixExternalViewBasedRouting, + public BrokerResourceOnlineOfflineStateModelFactory(ZkHelixPropertyStore<ZNRecord> propertyStore, + HelixDataAccessor helixDataAccessor, HelixExternalViewBasedRouting helixExternalViewBasedRouting, TableQueryQuotaManager tableQueryQuotaManager) { - _helixManager = helixManager; + _dataAccessor = helixDataAccessor; _propertyStore = propertyStore; - _helixAdmin = helixManager.getClusterManagmentTool(); _helixExternalViewBasedRouting = helixExternalViewBasedRouting; _tableQueryQuotaManager = tableQueryQuotaManager; } @@ -86,17 +77,14 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { try { LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message); - Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder(); String tableName = message.getPartitionName(); - HelixDataAccessor helixDataAccessor = _helixManager.getHelixDataAccessor(); - List<InstanceConfig> instanceConfigList = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs()); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableName); _helixExternalViewBasedRouting.markDataResourceOnline(tableConfig, - HelixHelper.getExternalViewForResource(_helixAdmin, _helixManager.getClusterName(), tableName), - instanceConfigList); - _tableQueryQuotaManager.initTableQueryQuota(tableConfig, HelixHelper - .getExternalViewForResource(_helixAdmin, _helixManager.getClusterName(), BROKER_RESOURCE_INSTANCE)); + _dataAccessor.getProperty(_dataAccessor.keyBuilder().externalView(tableName)), + _dataAccessor.getChildValues(_dataAccessor.keyBuilder().instanceConfigs())); + _tableQueryQuotaManager.initTableQueryQuota(tableConfig, + _dataAccessor.getProperty(_dataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE))); } catch (Exception e) { LOGGER.error("Caught exception during OFFLINE -> ONLINE transition", e); Utils.rethrowException(e); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java index e0a3e6c..d4b82da 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java @@ -18,10 +18,16 @@ */ package org.apache.pinot.broker.broker.helix; +import org.apache.helix.HelixConstants; + + /** * Handles cluster changes such as external view changes, instance config changes, live instance changes etc. */ public interface ClusterChangeHandler { - void processClusterChange(); + /** + * Processes the cluster change of the given type (e.g. EXTERNAL_VIEW, INSTANCE_CONFIG, LIVE_INSTANCE). + */ + void processClusterChange(HelixConstants.ChangeType changeType); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java index 0653ddc..d9ffcb8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java @@ -56,7 +56,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan // If no change got for 1 hour, proactively check changes private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L; - private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap; + private final Map<ChangeType, List<ClusterChangeHandler>> _changeHandlersMap; private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>(); private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>(); @@ -64,12 +64,13 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan private volatile boolean _stopped = false; - public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> changeHandlerMap, BrokerMetrics brokerMetrics) { - _changeHandlerMap = changeHandlerMap; + public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap, + BrokerMetrics brokerMetrics) { + _changeHandlersMap = changeHandlersMap; // Initialize last process time map long initTime = System.currentTimeMillis(); - for (ChangeType changeType : changeHandlerMap.keySet()) { + for (ChangeType changeType : changeHandlersMap.keySet()) { _lastProcessTimeMap.put(changeType, initTime); } @@ -78,9 +79,9 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan public void run() { while (!_stopped) { try { - for (Map.Entry<ChangeType, ClusterChangeHandler> entry : _changeHandlerMap.entrySet()) { + for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : _changeHandlersMap.entrySet()) { ChangeType changeType = entry.getKey(); - ClusterChangeHandler changeHandler = entry.getValue(); + List<ClusterChangeHandler> changeHandlers = entry.getValue(); long currentTime = System.currentTimeMillis(); Long lastChangeTime; synchronized (_lastChangeTimeMap) { @@ -89,13 +90,13 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan if (lastChangeTime != null) { brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime, TimeUnit.MILLISECONDS); - processClusterChange(changeType, changeHandler); + processClusterChange(changeType, changeHandlers); } else { long lastProcessTime = _lastProcessTimeMap.get(changeType); if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) { LOGGER.info("Proactive check {} change", changeType); brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L); - processClusterChange(changeType, changeHandler); + processClusterChange(changeType, changeHandlers); } } } @@ -116,11 +117,18 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan }; } - private void processClusterChange(ChangeType changeType, ClusterChangeHandler changeHandler) { + private void processClusterChange(ChangeType changeType, List<ClusterChangeHandler> changeHandlers) { long startTime = System.currentTimeMillis(); LOGGER.info("Start processing {} change", changeType); - changeHandler.processClusterChange(); - long endTime = System.currentTimeMillis(); + long handlerStartTime = startTime; + for (ClusterChangeHandler changeHandler : changeHandlers) { + changeHandler.processClusterChange(changeType); + long handlerEndTime = System.currentTimeMillis(); + LOGGER.info("Finish handling {} change for handler: {} in {}ms", changeType, changeHandler.getClass().getName(), + handlerEndTime - handlerStartTime); + handlerStartTime = handlerEndTime; + } + long endTime = handlerStartTime; LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - startTime); _lastProcessTimeMap.put(changeType, endTime); } @@ -191,7 +199,7 @@ public class ClusterChangeMediator implements ExternalViewChangeListener, Instan } } else { LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType); - processClusterChange(changeType, _changeHandlerMap.get(changeType)); + processClusterChange(changeType, _changeHandlersMap.get(changeType)); } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java deleted file mode 100644 index 1422953..0000000 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker.helix; - -import org.apache.pinot.broker.queryquota.TableQueryQuotaManager; -import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; - - -/** - * Cluster change handler for external view changes. - */ -public class ExternalViewChangeHandler implements ClusterChangeHandler { - private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting; - private final TableQueryQuotaManager _tableQueryQuotaManager; - - public ExternalViewChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting, - TableQueryQuotaManager tableQueryQuotaManager) { - _helixExternalViewBasedRouting = helixExternalViewBasedRouting; - _tableQueryQuotaManager = tableQueryQuotaManager; - } - - @Override - public void processClusterChange() { - _helixExternalViewBasedRouting.processExternalViewChange(); - _tableQueryQuotaManager.processQueryQuotaChange(); - } -} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index ce76f22..337a6c3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -21,19 +21,19 @@ package org.apache.pinot.broker.broker.helix; import com.google.common.collect.ImmutableList; import com.yammer.metrics.core.MetricsRegistry; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants.ChangeType; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; -import org.apache.helix.PreConnectCallback; import org.apache.helix.ZNRecord; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; @@ -61,16 +61,19 @@ import org.slf4j.LoggerFactory; * */ public class HelixBrokerStarter { - private static final String PROPERTY_STORE = "PROPERTYSTORE"; + // Spectator Helix manager handles the custom change listeners, properties read/write private final HelixManager _spectatorHelixManager; - private final HelixManager _helixManager; - private final HelixAdmin _helixAdmin; + // Participant Helix manager handles Helix functionality such as state transitions and messages + private final HelixManager _participantHelixManager; + private final Configuration _pinotHelixProperties; + private final HelixAdmin _helixAdmin; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; + private final HelixDataAccessor _helixDataAccessor; private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting; private final BrokerServerBuilder _brokerServerBuilder; - private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final LiveInstanceChangeHandler _liveInstanceChangeHandler; private final MetricsRegistry _metricsRegistry; private final TableQueryQuotaManager _tableQueryQuotaManager; @@ -117,66 +120,68 @@ public class HelixBrokerStarter { _spectatorHelixManager.connect(); _helixAdmin = _spectatorHelixManager.getClusterManagmentTool(); _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); + _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager, pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY)); _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager); _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager); - Map<ChangeType, ClusterChangeHandler> clusterChangeHandlerMap = new HashMap<>(); - clusterChangeHandlerMap.put(ChangeType.EXTERNAL_VIEW, - new ExternalViewChangeHandler(_helixExternalViewBasedRouting, _tableQueryQuotaManager)); - clusterChangeHandlerMap - .put(ChangeType.INSTANCE_CONFIG, new InstanceConfigChangeHandler(_helixExternalViewBasedRouting)); - clusterChangeHandlerMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandler); _brokerServerBuilder = startBroker(_pinotHelixProperties); _metricsRegistry = _brokerServerBuilder.getMetricsRegistry(); - _clusterChangeMediator = - new ClusterChangeMediator(clusterChangeHandlerMap, _brokerServerBuilder.getBrokerMetrics()); + + // Initialize cluster change mediator + Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new HashMap<>(); + List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>(); + externalViewChangeHandlers.add(_helixExternalViewBasedRouting); + externalViewChangeHandlers.add(_tableQueryQuotaManager); + externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager)); + changeHandlersMap.put(ChangeType.EXTERNAL_VIEW, externalViewChangeHandlers); + List<ClusterChangeHandler> instanceConfigChangeHandlers = new ArrayList<>(); + instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting); + instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager)); + changeHandlersMap.put(ChangeType.INSTANCE_CONFIG, instanceConfigChangeHandlers); + List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>(); + liveInstanceChangeHandler.add(_liveInstanceChangeHandler); + liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager)); + changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler); + _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, _brokerServerBuilder.getBrokerMetrics()); _clusterChangeMediator.start(); _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator); _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator); _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator); // Connect participant Helix manager. - _helixManager = + _participantHelixManager = HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, InstanceType.PARTICIPANT, zkServers); - StateMachineEngine stateMachineEngine = _helixManager.getStateMachineEngine(); + StateMachineEngine stateMachineEngine = _participantHelixManager.getStateMachineEngine(); StateModelFactory<?> stateModelFactory = - new BrokerResourceOnlineOfflineStateModelFactory(_spectatorHelixManager, _propertyStore, + new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _helixExternalViewBasedRouting, _tableQueryQuotaManager); stateMachineEngine .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory); - _helixManager.connect(); + _participantHelixManager.connect(); _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, _pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS)); - _helixManager.getMessagingService() + _participantHelixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), _tbiMessageHandler); addInstanceTagIfNeeded(helixClusterName, brokerId); - final double minResourcePercentForStartup = _pinotHelixProperties.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, - CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); // Register the service status handler + double minResourcePercentForStartup = _pinotHelixProperties + .getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, + CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START); ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList - .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, helixClusterName, - brokerId, minResourcePercentForStartup), - new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, helixClusterName, - brokerId, minResourcePercentForStartup)))); + .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager, + helixClusterName, brokerId, minResourcePercentForStartup), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager, + helixClusterName, brokerId, minResourcePercentForStartup)))); - _brokerServerBuilder.getBrokerMetrics().addCallbackGauge("helix.connected", new Callable<Long>() { - @Override - public Long call() - throws Exception { - return _helixManager.isConnected() ? 1L : 0L; - } - }); + _brokerServerBuilder.getBrokerMetrics() + .addCallbackGauge("helix.connected", () -> _participantHelixManager.isConnected() ? 1L : 0L); - _helixManager.addPreConnectCallback(new PreConnectCallback() { - @Override - public void onPreConnect() { - _brokerServerBuilder.getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L); - } - }); + _participantHelixManager.addPreConnectCallback(() -> _brokerServerBuilder.getBrokerMetrics() + .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); } private void setupHelixSystemProperties() { @@ -187,7 +192,8 @@ public class HelixBrokerStarter { } private void addInstanceTagIfNeeded(String clusterName, String instanceName) { - InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(clusterName, instanceName); + InstanceConfig instanceConfig = + _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName)); List<String> instanceTags = instanceConfig.getTags(); if (instanceTags == null || instanceTags.isEmpty()) { if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) { @@ -226,6 +232,45 @@ public class HelixBrokerStarter { return brokerServerBuilder; } + /** + * To be overridden to plug in custom external view change handlers. + * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change + * handlers from running. For slow change handler, make it asynchronous. + * + * @param spectatorHelixManager Spectator Helix manager + * @return List of custom external view change handlers to plug in + */ + @SuppressWarnings("unused") + protected List<ClusterChangeHandler> getCustomExternalViewChangeHandlers(HelixManager spectatorHelixManager) { + return Collections.emptyList(); + } + + /** + * To be overridden to plug in custom instance config change handlers. + * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change + * handlers from running. For slow change handler, make it asynchronous. + * + * @param spectatorHelixManager Spectator Helix manager + * @return List of custom instance config change handlers to plug in + */ + @SuppressWarnings("unused") + protected List<ClusterChangeHandler> getCustomInstanceConfigChangeHandlers(HelixManager spectatorHelixManager) { + return Collections.emptyList(); + } + + /** + * To be overridden to plug in custom live instance change handlers. + * <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change + * handlers from running. For slow change handler, make it asynchronous. + * + * @param spectatorHelixManager Spectator Helix manager + * @return List of custom live instance change handlers to plug in + */ + @SuppressWarnings("unused") + protected List<ClusterChangeHandler> getCustomLiveInstanceChangeHandlers(HelixManager spectatorHelixManager) { + return Collections.emptyList(); + } + public AccessControlFactory getAccessControlFactory() { return _accessControlFactory; } @@ -283,9 +328,9 @@ public class HelixBrokerStarter { public void shutdown() { LOGGER.info("Shutting down"); - if (_helixManager != null) { - LOGGER.info("Disconnecting Helix manager"); - _helixManager.disconnect(); + if (_participantHelixManager != null) { + LOGGER.info("Disconnecting participant Helix manager"); + _participantHelixManager.disconnect(); } if (_spectatorHelixManager != null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java deleted file mode 100644 index bfba1af..0000000 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.broker.broker.helix; - -import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; - - -/** - * Cluster change handler for instance config changes. - */ -public class InstanceConfigChangeHandler implements ClusterChangeHandler { - private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting; - - public InstanceConfigChangeHandler(HelixExternalViewBasedRouting helixExternalViewBasedRouting) { - _helixExternalViewBasedRouting = helixExternalViewBasedRouting; - } - - @Override - public void processClusterChange() { - _helixExternalViewBasedRouting.processInstanceConfigChange(); - } -} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java index aaecb47..6bd1229 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.broker.broker.helix; +import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; @@ -58,7 +60,10 @@ public class LiveInstanceChangeHandler implements ClusterChangeHandler { } @Override - public void processClusterChange() { + public void processClusterChange(HelixConstants.ChangeType changeType) { + Preconditions + .checkState(changeType == HelixConstants.ChangeType.LIVE_INSTANCE, "Illegal change type: " + changeType); + // Skip processing live instance change for single-connection routing if (_connectionPool == null) { return; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java index d37670e..13f24b8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java @@ -19,15 +19,18 @@ package org.apache.pinot.broker.queryquota; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.util.concurrent.RateLimiter; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.model.ExternalView; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.broker.broker.helix.ClusterChangeHandler; import org.apache.pinot.common.config.QuotaConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableNameBuilder; @@ -43,7 +46,7 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType; -public class TableQueryQuotaManager { +public class TableQueryQuotaManager implements ClusterChangeHandler { private static final Logger LOGGER = LoggerFactory.getLogger(TableQueryQuotaManager.class); private BrokerMetrics _brokerMetrics; @@ -327,4 +330,11 @@ public class TableQueryQuotaManager { .info("Processed query quota change in {}ms, {} out of {} query quota configs rebuilt.", (endTime - startTime), numRebuilt, _rateLimiterMap.size()); } + + @Override + public void processClusterChange(HelixConstants.ChangeType changeType) { + Preconditions + .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, "Illegal change type: " + changeType); + processQueryQuotaChange(); + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java index e62a7ef..51a9556 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java @@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.HashMap; @@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.Configuration; import org.apache.helix.AccessOption; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; @@ -39,6 +41,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.broker.broker.helix.ClusterChangeHandler; import org.apache.pinot.broker.routing.builder.RoutingTableBuilder; import org.apache.pinot.broker.routing.selector.SegmentSelector; import org.apache.pinot.broker.routing.selector.SegmentSelectorProvider; @@ -64,7 +67,7 @@ import org.slf4j.LoggerFactory; * of an offline routing table and a realtime routing table, with the realtime routing table being aware of the * fact that there is both an hlc and llc one. */ -public class HelixExternalViewBasedRouting implements RoutingTable { +public class HelixExternalViewBasedRouting implements ClusterChangeHandler, RoutingTable { private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedRouting.class); private final Map<String, RoutingTableBuilder> _routingTableBuilderMap; @@ -598,4 +601,15 @@ public class HelixExternalViewBasedRouting implements RoutingTable { return JsonUtils.objectToPrettyString(ret); } + + @Override + public void processClusterChange(HelixConstants.ChangeType changeType) { + Preconditions.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW + || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType); + if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) { + processExternalViewChange(); + } else { + processInstanceConfigChange(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
