This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-logic-for-lead-controller-resource in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d8c51fdbd0fffe3db199d5e9ce4db1e0b4a05113 Author: jackjlli <[email protected]> AuthorDate: Fri Jun 14 16:35:14 2019 -0700 Add logic for lead controller resource on controller side --- .../pinot/common/utils/helix/HelixHelper.java | 9 ++ .../controller/ControllerLeadershipManager.java | 117 --------------------- .../apache/pinot/controller/ControllerStarter.java | 69 ++++++------ .../pinot/controller/LeadControllerManager.java | 74 +++++++++++++ .../controller/LeadershipChangeSubscriber.java | 35 ------ .../PinotSegmentUploadRestletResource.java | 7 +- .../controller/api/upload/SegmentValidator.java | 10 +- .../controller/helix/SegmentStatusChecker.java | 8 +- .../helix/core/PinotHelixResourceManager.java | 28 ++++- .../helix/core/minion/PinotTaskManager.java | 8 +- .../core/periodictask/ControllerPeriodicTask.java | 10 +- .../ControllerPeriodicTaskScheduler.java | 60 ----------- .../realtime/PinotLLCRealtimeSegmentManager.java | 32 +++--- .../core/realtime/PinotRealtimeSegmentManager.java | 43 +++----- .../core/realtime/SegmentCompletionManager.java | 54 +++++----- .../core/relocation/RealtimeSegmentRelocator.java | 8 +- .../helix/core/retention/RetentionManager.java | 8 +- .../core/statemodel/LeadControllerChecker.java | 57 ++++++++++ ...rollerResourceMasterSlaveStateModelFactory.java | 64 +++++++++++ .../BrokerResourceValidationManager.java | 5 +- .../validation/OfflineSegmentIntervalChecker.java | 13 ++- .../RealtimeSegmentValidationManager.java | 8 +- .../controller/validation/StorageQuotaChecker.java | 14 +-- .../controller/helix/PinotControllerModeTest.java | 16 ++- .../controller/helix/SegmentStatusCheckerTest.java | 73 +++++++++++-- .../periodictask/ControllerPeriodicTaskTest.java | 7 +- .../PinotLLCRealtimeSegmentManagerTest.java | 7 +- .../helix/core/realtime/SegmentCompletionTest.java | 10 +- .../relocation/RealtimeSegmentRelocatorTest.java | 14 ++- .../helix/core/retention/RetentionManagerTest.java | 12 ++- .../validation/StorageQuotaCheckerTest.java | 27 +++-- .../server/realtime/ControllerLeaderLocator.java | 17 +-- 32 files changed, 509 insertions(+), 415 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 21f2153..b000246 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -500,4 +500,13 @@ public class HelixHelper { public static Set<String> getBrokerInstancesForTenant(List<InstanceConfig> instanceConfigs, String tenant) { return new HashSet<>(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getBrokerTagForTenant(tenant))); } + + /** + * Gets hash code for table. + * @param rawTableName table name + * @return hash code + */ + public static int getHashCodeForTable(String rawTableName) { + return rawTableName.hashCode(); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java deleted file mode 100644 index 7d4705e..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller; - -import java.util.HashMap; -import java.util.Map; -import javax.annotation.concurrent.ThreadSafe; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.metrics.ControllerGauge; -import org.apache.pinot.common.metrics.ControllerMetrics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Single place for listening on controller changes. - * This should be created at controller startup and everyone who wants to listen to controller changes should subscribe. - */ -@ThreadSafe -public class ControllerLeadershipManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class); - - private final HelixManager _helixControllerManager; - private final ControllerMetrics _controllerMetrics; - - private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>(); - private boolean _amILeader = false; - - public ControllerLeadershipManager(HelixManager helixControllerManager, ControllerMetrics controllerMetrics) { - _helixControllerManager = helixControllerManager; - _controllerMetrics = controllerMetrics; - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 0L); - } - - /** - * Subscribes to changes in the controller leadership. - * <p>If controller is already leader, invoke {@link LeadershipChangeSubscriber#onBecomingLeader()} - */ - public synchronized void subscribe(String name, LeadershipChangeSubscriber subscriber) { - LOGGER.info("{} subscribing to leadership changes", name); - _subscribers.put(name, subscriber); - if (_amILeader) { - subscriber.onBecomingLeader(); - } - } - - public boolean isLeader() { - return _amILeader; - } - - /** - * Stops the service. - * <p>If controller is leader, invoke {@link ControllerLeadershipManager#onBecomingNonLeader()} - */ - public synchronized void stop() { - if (_amILeader) { - onBecomingNonLeader(); - } - } - - /** - * Callback on changes in the controller. Should be registered to the controller callback. - */ - synchronized void onControllerChange() { - if (_helixControllerManager.isLeader()) { - if (!_amILeader) { - _amILeader = true; - LOGGER.info("Became leader"); - onBecomingLeader(); - } else { - LOGGER.info("Already leader. Duplicate notification"); - } - } else { - if (_amILeader) { - _amILeader = false; - LOGGER.info("Lost leadership"); - onBecomingNonLeader(); - } else { - LOGGER.info("Already not leader. Duplicate notification"); - } - } - } - - private void onBecomingLeader() { - long startTimeMs = System.currentTimeMillis(); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 1L); - for (LeadershipChangeSubscriber subscriber : _subscribers.values()) { - subscriber.onBecomingLeader(); - } - LOGGER.info("Finished on becoming leader in {}ms", System.currentTimeMillis() - startTimeMs); - } - - private void onBecomingNonLeader() { - long startTimeMs = System.currentTimeMillis(); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 0L); - for (LeadershipChangeSubscriber subscriber : _subscribers.values()) { - subscriber.onBecomingNonLeader(); - } - LOGGER.info("Finished on becoming non-leader in {}ms", System.currentTimeMillis() - startTimeMs); - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index c313616..f7eb2a0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -37,7 +37,6 @@ import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.SystemPropertyKeys; -import org.apache.helix.api.listeners.ControllerChangeListener; import org.apache.helix.task.TaskDriver; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metrics.ControllerMeter; @@ -54,7 +53,6 @@ import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; -import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; @@ -68,6 +66,7 @@ import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.core.crypt.PinotCrypterFactory; import org.apache.pinot.core.periodictask.PeriodicTask; +import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; import org.apache.pinot.filesystem.PinotFSFactory; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.slf4j.Logger; @@ -107,12 +106,12 @@ public class ControllerStarter { private RetentionManager _retentionManager; private SegmentStatusChecker _segmentStatusChecker; private PinotTaskManager _taskManager; - private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler; + private PeriodicTaskScheduler _periodicTaskScheduler; private PinotHelixTaskResourceManager _helixTaskResourceManager; private PinotRealtimeSegmentManager _realtimeSegmentsManager; private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private SegmentCompletionManager _segmentCompletionManager; - private ControllerLeadershipManager _controllerLeadershipManager; + private LeadControllerManager _leadControllerManager; private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList; public ControllerStarter(ControllerConf conf) { @@ -184,10 +183,6 @@ public class ControllerStarter { return _taskManager; } - public ControllerLeadershipManager getControllerLeadershipManager() { - return _controllerLeadershipManager; - } - public void start() { LOGGER.info("Starting Pinot controller in mode: {}.", _controllerMode.name()); Utils.logVersions(); @@ -230,9 +225,6 @@ public class ControllerStarter { () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixControllerManager)); - - LOGGER.info("Initializing controller leadership manager"); - _controllerLeadershipManager = new ControllerLeadershipManager(_helixControllerManager, _controllerMetrics); } private void setUpPinotController() { @@ -255,30 +247,25 @@ public class ControllerStarter { _helixResourceManager.start(); HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager(); - LOGGER.info("Registering controller leadership manager"); - // TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove - // ControllerLeadershipManager and this callback. - helixParticipantManager.addControllerListener( - (ControllerChangeListener) changeContext -> _controllerLeadershipManager.onControllerChange()); + // Get lead controller manager from resource manager. + _leadControllerManager = _helixResourceManager.getLeadControllerManager(); LOGGER.info("Starting task resource manager"); _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixParticipantManager)); // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager LOGGER.info("Starting realtime segment manager"); - _pinotLLCRealtimeSegmentManager = - new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, - _controllerLeadershipManager); - // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed. + new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _leadControllerManager); + // TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed. _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager); _segmentCompletionManager = new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics, - _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds()); + _leadControllerManager, _config.getSegmentCommitTimeoutSeconds()); if (_config.getHLCTablesAllowed()) { LOGGER.info("Realtime tables with High Level consumers will be supported"); - _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager); + _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager); _realtimeSegmentsManager.start(_controllerMetrics); } else { LOGGER.info("Realtime tables with High Level consumers will NOT be supported"); @@ -288,8 +275,10 @@ public class ControllerStarter { // Setting up periodic tasks List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks(); LOGGER.info("Init controller periodic tasks scheduler"); - _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler(); - _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager); + _periodicTaskScheduler = new PeriodicTaskScheduler(); + _periodicTaskScheduler.init(controllerPeriodicTasks); + + _periodicTaskScheduler.start(); LOGGER.info("Registering rebalance segments factory"); _helixResourceManager @@ -327,7 +316,7 @@ public class ControllerStarter { bind(_controllerMetrics).to(ControllerMetrics.class); bind(accessControlFactory).to(AccessControlFactory.class); bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class); - bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class); + bind(_leadControllerManager).to(LeadControllerManager.class); } }); @@ -433,24 +422,29 @@ public class ControllerStarter { protected List<PeriodicTask> setupControllerPeriodicTasks() { LOGGER.info("Setting up periodic tasks"); List<PeriodicTask> periodicTasks = new ArrayList<>(); - _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics); + _taskManager = + new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config, + _controllerMetrics); periodicTasks.add(_taskManager); - _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics); + _retentionManager = + new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); periodicTasks.add(_retentionManager); _offlineSegmentIntervalChecker = - new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry), - _controllerMetrics); + new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager, + new ValidationMetrics(_metricsRegistry), _controllerMetrics); periodicTasks.add(_offlineSegmentIntervalChecker); _realtimeSegmentValidationManager = - new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager, - new ValidationMetrics(_metricsRegistry), _controllerMetrics); + new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager, + _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics); periodicTasks.add(_realtimeSegmentValidationManager); _brokerResourceValidationManager = - new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics); + new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics); periodicTasks.add(_brokerResourceValidationManager); - _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics); + _segmentStatusChecker = + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); periodicTasks.add(_segmentStatusChecker); - _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics); + _realtimeSegmentRelocator = + new RealtimeSegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); periodicTasks.add(_realtimeSegmentRelocator); return periodicTasks; @@ -478,9 +472,10 @@ public class ControllerStarter { private void stopPinotController() { try { - // Stopping ControllerLeadershipManager has to be done before stopping HelixResourceManager. - LOGGER.info("Stopping controller leadership manager"); - _controllerLeadershipManager.stop(); + // Stopping periodic tasks has to be done before stopping HelixResourceManager. + // Stop controller periodic task. + LOGGER.info("Stopping controller periodic tasks"); + _periodicTaskScheduler.stop(); // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API // may interrupt the handlers waiting on an I/O. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java new file mode 100644 index 0000000..3ec507e --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller; + +import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; + + +public class LeadControllerManager { + private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerManager.class); + + private final LeadControllerChecker _leadControllerChecker; + private PinotHelixResourceManager _pinotHelixResourceManager; + + public LeadControllerManager() { + _leadControllerChecker = new LeadControllerChecker(); + } + + public void registerResourceManager(PinotHelixResourceManager pinotHelixResourceManager) { + _pinotHelixResourceManager = pinotHelixResourceManager; + } + + /** + * Check whether the current controller is the leader for the given table. Return true if current controller is the leader for this table. + * Otherwise check whether the current controller is helix leader. + * @param tableName table name with/without table type. + */ + public boolean isLeaderForTable(String tableName) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + int partitionIndex = HelixHelper.getHashCodeForTable(rawTableName) % NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; + if (_leadControllerChecker.isPartitionLeader(partitionIndex)) { + return true; + } else { + return isHelixLeader(); + } + } + + public synchronized void addPartitionLeader(String partitionName) { + _leadControllerChecker.addPartitionLeader(partitionName); + } + + public synchronized void removePartitionLeader(String partitionName) { + _leadControllerChecker.removePartitionLeader(partitionName); + } + + private boolean isHelixLeader() { + return _pinotHelixResourceManager != null && _pinotHelixResourceManager.isHelixLeader(); + } + + public void onLeadControllerChange() { + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java deleted file mode 100644 index d2c646f..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller; - -/** - * Interface for a subscriber to the {@link ControllerLeadershipManager} - */ -public interface LeadershipChangeSubscriber { - - /** - * Callback to invoke on becoming leader - */ - void onBecomingLeader(); - - /** - * Callback to invoke on losing leadership - */ - void onBecomingNonLeader(); -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java index 5259d9f..bd4b8a0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java @@ -57,7 +57,6 @@ import javax.ws.rs.core.Response; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.helix.ZNRecord; import org.apache.helix.model.IdealState; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metrics.ControllerMeter; @@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.access.AccessControl; import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.upload.SegmentValidator; @@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource { AccessControlFactory _accessControlFactory; @Inject - ControllerLeadershipManager _controllerLeadershipManager; + LeadControllerManager _leadControllerManager; @GET @Produces(MediaType.APPLICATION_JSON) @@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource { // Validate segment SegmentValidatorResponse segmentValidatorResponse = new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager, - _controllerMetrics, _controllerLeadershipManager) + _controllerMetrics, _leadControllerManager) .validateSegment(rawTableName, segmentMetadata, tempSegmentDir); // Zk operations diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java index 39a9657..42c7cf8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java @@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.time.TimeUtils; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.resources.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; @@ -55,17 +55,17 @@ public class SegmentValidator { private final Executor _executor; private final HttpConnectionManager _connectionManager; private final ControllerMetrics _controllerMetrics; - private final ControllerLeadershipManager _controllerLeadershipManager; + private final LeadControllerManager _leadControllerManager; public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics, - ControllerLeadershipManager controllerLeadershipManager) { + LeadControllerManager leadControllerManager) { _pinotHelixResourceManager = pinotHelixResourceManager; _controllerConf = controllerConf; _executor = executor; _connectionManager = connectionManager; _controllerMetrics = controllerMetrics; - _controllerLeadershipManager = controllerLeadershipManager; + _leadControllerManager = leadControllerManager; } public SegmentValidatorResponse validateSegment(String rawTableName, SegmentMetadata segmentMetadata, @@ -135,7 +135,7 @@ public class SegmentValidator { new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager); StorageQuotaChecker quotaChecker = new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager, - _controllerLeadershipManager); + _leadControllerManager); return quotaChecker.isSegmentStorageWithinQuota(segmentFile, metadata.getName(), _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index f40e2b3..9261420 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.slf4j.Logger; @@ -56,10 +57,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh * @param pinotHelixResourceManager The resource checker used to interact with Helix * @param config The controller configuration object */ - public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, - ControllerMetrics controllerMetrics) { + public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) { super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), - config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics); + config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, + controllerMetrics); _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a4d4d0b..cb68304 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.ZNRecord; -import org.apache.helix.examples.MasterSlaveStateModelFactory; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor; import org.apache.helix.model.CurrentState; @@ -99,6 +98,7 @@ import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.common.utils.retry.RetryPolicy; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.pojos.Instance; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy; @@ -106,6 +106,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum; import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory; +import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.core.realtime.stream.StreamConfig; @@ -144,6 +145,7 @@ public class PinotHelixResourceManager { private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory; private TableRebalancer _tableRebalancer; + private LeadControllerManager _leadControllerManager; public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName, @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis, @@ -170,6 +172,8 @@ public class PinotHelixResourceManager { * Create Helix cluster if needed, and then start a Pinot controller instance. */ public synchronized void start() { + // LeadControllerManager needs to be initialized before registering to Helix participant. + _leadControllerManager = new LeadControllerManager(); _helixZkManager = registerAndConnectAsHelixParticipant(); _helixAdmin = _helixZkManager.getClusterManagmentTool(); _propertyStore = _helixZkManager.getHelixPropertyStore(); @@ -189,6 +193,7 @@ public class PinotHelixResourceManager { _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore); ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster); _tableRebalancer = new TableRebalancer(_helixZkManager, _helixAdmin, _helixClusterName); + _leadControllerManager.registerResourceManager(this); } /** @@ -255,6 +260,16 @@ public class PinotHelixResourceManager { return _propertyStore; } + + /** + * Get lead controller manager. + * + * @return lead controller manager + */ + public LeadControllerManager getLeadControllerManager() { + return _leadControllerManager; + } + /** * Register and connect to Helix cluster as PARTICIPANT role. */ @@ -263,8 +278,8 @@ public class PinotHelixResourceManager { HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL); // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource. - helixManager.getStateMachineEngine() - .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory()); + helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name, + new LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager)); try { helixManager.connect(); @@ -2372,6 +2387,13 @@ public class PinotHelixResourceManager { return endpointToInstance; } + public boolean isHelixLeader() { + PropertyKey propertyKey = _keyBuilder.controllerLeader(); + LiveInstance liveInstance = _helixDataAccessor.getProperty(propertyKey); + String helixLeaderInstanceId = liveInstance.getInstanceName(); + return _instanceId.equals(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + helixLeaderInstanceId); + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 7a09c00..649b966 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry; @@ -51,10 +52,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { private final TaskGeneratorRegistry _taskGeneratorRegistry; public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, - PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics) { + PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager, + ControllerConf controllerConf, ControllerMetrics controllerMetrics) { super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), - controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, controllerMetrics); + controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager, + controllerMetrics); _helixTaskResourceManager = helixTaskResourceManager; _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf); _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 7e764f3..a782385 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.periodictask.BasePeriodicTask; import org.slf4j.Logger; @@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class); protected final PinotHelixResourceManager _pinotHelixResourceManager; + protected final LeadControllerManager _leadControllerManager; protected final ControllerMetrics _controllerMetrics; public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds, - PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) { + PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, + ControllerMetrics controllerMetrics) { super(taskName, runFrequencyInSeconds, initialDelayInSeconds); _pinotHelixResourceManager = pinotHelixResourceManager; + _leadControllerManager = leadControllerManager; _controllerMetrics = controllerMetrics; } @@ -75,6 +79,10 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { LOGGER.info("Task: {} is stopped, early terminate the task", _taskName); break; } + // Check if current controller is the leader for this table. + if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) { + continue; + } try { processTable(tableNameWithType, context); } catch (Exception e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java deleted file mode 100644 index 8a2b63c..0000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.periodictask; - -import java.util.List; -import org.apache.pinot.controller.ControllerLeadershipManager; -import org.apache.pinot.controller.LeadershipChangeSubscriber; -import org.apache.pinot.core.periodictask.PeriodicTask; -import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} according to controller leadership changes. - * Any controllerPeriodicTasks provided during initialization, will run only on leadership, and stop when leadership lost - */ -public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber { - - private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTaskScheduler.class); - - /** - * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup - * This is called only once during controller startup - * @param controllerPeriodicTasks - * @param controllerLeadershipManager - */ - public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) { - super.init(controllerPeriodicTasks); - controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this); - } - - @Override - public void onBecomingLeader() { - LOGGER.info("Received callback for controller leadership gain. Starting PeriodicTaskScheduler."); - start(); - } - - @Override - public void onBecomingNonLeader() { - LOGGER.info("Received callback for controller leadership loss. Stopping PeriodicTaskScheduler."); - stop(); - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index c833261..0982c30 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; @@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager { private final TableConfigCache _tableConfigCache; private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator; private final FlushThresholdUpdateManager _flushThresholdUpdateManager; - private final ControllerLeadershipManager _controllerLeadershipManager; + private final LeadControllerManager _leadControllerManager; private volatile boolean _isStopping = false; private AtomicInteger _numCompletingSegments = new AtomicInteger(0); public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) { + ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) { _helixAdmin = helixResourceManager.getHelixAdmin(); _helixManager = helixResourceManager.getHelixZkManager(); _propertyStore = helixResourceManager.getPropertyStore(); @@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager { _tableConfigCache = new TableConfigCache(_propertyStore); _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager); _flushThresholdUpdateManager = new FlushThresholdUpdateManager(); - _controllerLeadershipManager = controllerLeadershipManager; + _leadControllerManager = leadControllerManager; } @@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager { LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get()); } - protected boolean isLeader() { - return _controllerLeadershipManager.isLeader(); + protected boolean isLeader(String tableName) { + return _leadControllerManager.isLeaderForTable(tableName); } protected boolean isConnected() { @@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager { URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName)); PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme()); - if (!isConnected() || !isLeader()) { + if (!isConnected() || !isLeader(tableName)) { // We can potentially log a different value than what we saw .... LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}", - segmentName, segmentLocation, tableName, isLeader(), isConnected()); + segmentName, segmentLocation, tableName, isLeader(tableName), isConnected()); _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return false; } @@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager { final String oldZnodePath = ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, committingSegmentNameStr); - if (!isConnected() || !isLeader()) { + if (!isConnected() || !isLeader(realtimeTableName)) { // We can potentially log a different value than what we saw .... LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}", - committingSegmentNameStr, realtimeTableName, isLeader(), isConnected()); + committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected()); _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return false; } boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, realtimeTableName, stat.getVersion()); if (!success) { LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}", - committingSegmentNameStr, realtimeTableName, isLeader(), isConnected()); + committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected()); } return success; } @@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager { ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, newSegmentNameStr); if (!isNewTableSetup) { - if (!isLeader() || !isConnected()) { + if (!isLeader(realtimeTableName) || !isConnected()) { // We can potentially log a different value than what we saw .... LOGGER.warn( "Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}", - newSegmentNameStr, rawTableName, isLeader(), isConnected()); + newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected()); _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return false; } @@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager { boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, realtimeTableName); if (!success) { LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}", - newSegmentNameStr, rawTableName, isLeader(), isConnected()); + newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected()); } return success; } @@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager { return idealState; } - - public ControllerLeadershipManager getControllerLeadershipManager() { - return _controllerLeadershipManager; - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java index 331a4a0..1f115fe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java @@ -52,8 +52,7 @@ import org.apache.pinot.common.utils.HLCSegmentName; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.retry.RetryPolicies; -import org.apache.pinot.controller.ControllerLeadershipManager; -import org.apache.pinot.controller.LeadershipChangeSubscriber; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.apache.pinot.core.query.utils.Pair; @@ -66,7 +65,7 @@ import org.slf4j.LoggerFactory; /** * Realtime segment manager, which assigns realtime segments to server instances so that they can consume from the stream. */ -public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener, LeadershipChangeSubscriber { +public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener { private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSegmentManager.class); private static final String TABLE_CONFIG = "/CONFIGS/TABLE"; private static final String SEGMENTS_PATH = "/SEGMENTS"; @@ -80,12 +79,12 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh private final PinotHelixResourceManager _pinotHelixResourceManager; private ZkClient _zkClient; private ControllerMetrics _controllerMetrics; - private final ControllerLeadershipManager _controllerLeadershipManager; + private final LeadControllerManager _leadControllerManager; public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager, - ControllerLeadershipManager controllerLeadershipManager) { + LeadControllerManager leadControllerManager) { _pinotHelixResourceManager = pinotManager; - _controllerLeadershipManager = controllerLeadershipManager; + _leadControllerManager = leadControllerManager; String clusterName = _pinotHelixResourceManager.getHelixClusterName(); _propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName); _tableConfigPath = _propertyStorePath + TABLE_CONFIG; @@ -104,9 +103,6 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh _zkClient.subscribeChildChanges(_tableConfigPath, this); _zkClient.subscribeDataChanges(_tableConfigPath, this); - // Subscribe to leadership changes - _controllerLeadershipManager.subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this); - // Setup change listeners for already existing tables, if any. processPropertyStoreChange(_tableConfigPath); } @@ -128,6 +124,11 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh continue; } + // Skip if the current controller isn't the leader of this table + if (!_leadControllerManager.isLeaderForTable(realtimeTableName)) { + continue; + } + StreamConfig metadata = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); if (metadata.hasHighLevelConsumerType()) { idealStateMap.put(realtimeTableName, _pinotHelixResourceManager.getHelixAdmin() @@ -273,10 +274,6 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh } } - private boolean isLeader() { - return _controllerLeadershipManager.isLeader(); - } - @Override public synchronized void onDataChange(String path) { LOGGER.info("PinotRealtimeSegmentManager.onDataChange: {}", path); @@ -300,13 +297,9 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh LOGGER.info("Processing change notification for path: {}", path); refreshWatchers(path); - if (isLeader()) { - if (path.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || path - .matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || path.equals(CONTROLLER_LEADER_CHANGE)) { - assignRealtimeSegmentsToServerInstancesIfNecessary(); - } - } else { - LOGGER.info("Not the leader of this cluster, ignoring realtime segment property store change."); + if (path.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || path + .matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || path.equals(CONTROLLER_LEADER_CHANGE)) { + assignRealtimeSegmentsToServerInstancesIfNecessary(); } } catch (Exception e) { LOGGER.error("Caught exception while processing change for path {}", path, e); @@ -415,14 +408,4 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath); processPropertyStoreChange(dataPath); } - - @Override - public void onBecomingLeader() { - processPropertyStoreChange(CONTROLLER_LEADER_CHANGE); - } - - @Override - public void onBecomingNonLeader() { - processPropertyStoreChange(CONTROLLER_LEADER_CHANGE); - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index a4da6b4..8935c6b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.helix.core.realtime; import com.google.common.annotations.VisibleForTesting; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMeter; @@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.SegmentName.SEPARATOR; + /** * This is a singleton class in the controller that drives the state machines for segments that are in the @@ -73,7 +72,7 @@ public class SegmentCompletionManager { private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>(); private final PinotLLCRealtimeSegmentManager _segmentManager; private final ControllerMetrics _controllerMetrics; - private final ControllerLeadershipManager _controllerLeadershipManager; + private final LeadControllerManager _leadControllerManager; private final Lock[] _fsmLocks; private static final int NUM_FSM_LOCKS = 20; @@ -87,12 +86,12 @@ public class SegmentCompletionManager { // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late. public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, - ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager, + ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, int segmentCommitTimeoutSeconds) { _helixManager = helixManager; _segmentManager = segmentManager; _controllerMetrics = controllerMetrics; - _controllerLeadershipManager = controllerLeadershipManager; + _leadControllerManager = leadControllerManager; SegmentCompletionProtocol .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS)); _fsmLocks = new Lock[NUM_FSM_LOCKS]; @@ -163,11 +162,12 @@ public class SegmentCompletionManager { * that it currently has (i.e. next offset that it will consume, if it continues to consume). */ public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) { - if (!isLeader() || !_helixManager.isConnected()) { + final String segmentNameStr = reqParams.getSegmentName(); + final String tableName = segmentNameStr.split(SEPARATOR)[0]; + if (!isLeader(tableName) || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } - final String segmentNameStr = reqParams.getSegmentName(); final String instanceId = reqParams.getInstanceId(); final String stopReason = reqParams.getReason(); final long offset = reqParams.getOffset(); @@ -201,11 +201,12 @@ public class SegmentCompletionManager { */ public SegmentCompletionProtocol.Response segmentCommitStart( final SegmentCompletionProtocol.Request.Params reqParams) { - if (!isLeader() || !_helixManager.isConnected()) { + final String segmentNameStr = reqParams.getSegmentName(); + final String tableName = segmentNameStr.split(SEPARATOR)[0]; + if (!isLeader(tableName) || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } - final String segmentNameStr = reqParams.getSegmentName(); final String instanceId = reqParams.getInstanceId(); final long offset = reqParams.getOffset(); LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); @@ -225,11 +226,12 @@ public class SegmentCompletionManager { } public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) { - if (!isLeader() || !_helixManager.isConnected()) { + final String segmentNameStr = reqParams.getSegmentName(); + final String tableName = segmentNameStr.split(SEPARATOR)[0]; + if (!isLeader(tableName) || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } - final String segmentNameStr = reqParams.getSegmentName(); final String instanceId = reqParams.getInstanceId(); final long offset = reqParams.getOffset(); final int extTimeSec = reqParams.getExtraTimeSec(); @@ -256,11 +258,12 @@ public class SegmentCompletionManager { */ public SegmentCompletionProtocol.Response segmentStoppedConsuming( SegmentCompletionProtocol.Request.Params reqParams) { - if (!isLeader() || !_helixManager.isConnected()) { + final String segmentNameStr = reqParams.getSegmentName(); + final String tableName = segmentNameStr.split(SEPARATOR)[0]; + if (!isLeader(tableName) || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } - final String segmentNameStr = reqParams.getSegmentName(); final String instanceId = reqParams.getInstanceId(); final long offset = reqParams.getOffset(); final String reason = reqParams.getReason(); @@ -292,11 +295,12 @@ public class SegmentCompletionManager { */ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) { - if (!isLeader() || !_helixManager.isConnected()) { + final String segmentNameStr = reqParams.getSegmentName(); + final String tableName = segmentNameStr.split(SEPARATOR)[0]; + if (!isLeader(tableName) || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } - final String segmentNameStr = reqParams.getSegmentName(); LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); SegmentCompletionFSM fsm = null; SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED; @@ -352,6 +356,7 @@ public class SegmentCompletionManager { State _state = State.HOLDING; // Typically start off in HOLDING state. final long _startTimeMs; private final LLCSegmentName _segmentName; + private final String _realtimeTableName; private final int _numReplicas; private final Set<String> _excludedServerStateMap; private final Map<String, Long> _commitStateMap; @@ -394,6 +399,7 @@ public class SegmentCompletionManager { private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) { _segmentName = segmentName; + _realtimeTableName = _segmentName.getTableName(); _numReplicas = numReplicas; _segmentManager = segmentManager; _commitStateMap = new HashMap<>(_numReplicas); @@ -403,8 +409,8 @@ public class SegmentCompletionManager { _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS; _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS; long initialCommitTimeMs = - MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_segmentName.getTableName()); - Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(segmentName.getTableName()); + MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_realtimeTableName); + Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(_realtimeTableName); if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) { initialCommitTimeMs = savedCommitTime; } @@ -413,7 +419,7 @@ public class SegmentCompletionManager { // The table has a really high value configured for max commit time. Set it to a higher value than default // and go from there. LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", initialCommitTimeMs / 1000, - segmentName.getTableName(), MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS); + _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS); initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000; } _initialCommitTimeMs = initialCommitTimeMs; @@ -681,14 +687,14 @@ public class SegmentCompletionManager { private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) { _state = State.ABORTED; _segmentCompletionManager._controllerMetrics - .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); + .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return hold(instanceId, offset); } private SegmentCompletionProtocol.Response abortAndReturnFailed() { _state = State.ABORTED; _segmentCompletionManager._controllerMetrics - .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); + .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1); return SegmentCompletionProtocol.RESP_FAILED; } @@ -1117,7 +1123,7 @@ public class SegmentCompletionManager { } @VisibleForTesting - protected boolean isLeader() { - return _controllerLeadershipManager.isLeader(); + protected boolean isLeader(String tableName) { + return _leadControllerManager.isLeaderForTable(tableName); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java index ea5b05b..2c2c017 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java @@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.common.utils.time.TimeUtils; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; @@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory; public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class); - public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, - ControllerMetrics controllerMetrics) { + public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) { super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()), - config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics); + config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, + controllerMetrics); } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java index 7d5182d..fecb4ac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java @@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy; @@ -54,10 +55,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> { private final int _deletedSegmentsRetentionInDays; - public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, - ControllerMetrics controllerMetrics) { + public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) { super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), - config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics); + config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, + controllerMetrics); _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays(); LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java new file mode 100644 index 0000000..d2888ed --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.statemodel; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; + + +public class LeadControllerChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerChecker.class); + + private Map<Integer, Integer> _partitionCache; + + public LeadControllerChecker() { + _partitionCache = new ConcurrentHashMap<>(); + } + + public void addPartitionLeader(String partitionName) { + LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName); + int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1)); + _partitionCache.put(partitionIndex, partitionIndex); + } + + public void removePartitionLeader(String partitionName) { + LOGGER.info("Remove Partition: {} from LeadControllerChecker", partitionName); + int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1)); + _partitionCache.remove(partitionIndex); + } + + public boolean isPartitionLeader(int partitionIndex) { + Preconditions + .checkArgument(partitionIndex >= 0 && partitionIndex < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, + "Invalid partition index: " + partitionIndex); + return _partitionCache.containsKey(partitionIndex); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java new file mode 100644 index 0000000..d063c58 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.statemodel; + +import org.apache.helix.NotificationContext; +import org.apache.helix.examples.MasterSlaveStateModelFactory; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.pinot.controller.LeadControllerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LeadControllerResourceMasterSlaveStateModelFactory extends MasterSlaveStateModelFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class); + + private final LeadControllerManager _leadControllerManager; + + public LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager leadControllerManager) { + super(); + _leadControllerManager = leadControllerManager; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + MasterSlaveStateModel stateModel = new LeadControllerResourceMasterSlaveStateModel(); + stateModel.setPartitionName(partitionName); + return stateModel; + } + + public class LeadControllerResourceMasterSlaveStateModel extends MasterSlaveStateModel { + @Override + public void onBecomeSlaveFromMaster(Message message, NotificationContext context) { + super.onBecomeSlaveFromMaster(message, context); + String partitionName = message.getPartitionName(); + _leadControllerManager.addPartitionLeader(partitionName); + _leadControllerManager.onLeadControllerChange(); + } + + @Override + public void onBecomeMasterFromSlave(Message message, NotificationContext context) { + super.onBecomeMasterFromSlave(message, context); + String partitionName = message.getPartitionName(); + _leadControllerManager.removePartitionLeader(partitionName); + _leadControllerManager.onLeadControllerChange(); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java index 5748d3c..a82a161 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java @@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.slf4j.Logger; @@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class); public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, - ControllerMetrics controllerMetrics) { + LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) { super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(), - config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics); + config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); } @Override diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java index 7f19395..0eeadc7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.util.SegmentIntervalUtils; @@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> private final ValidationMetrics _validationMetrics; public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, - ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { + LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics, + ControllerMetrics controllerMetrics) { super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(), - config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics); + config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, + leadControllerManager, controllerMetrics); _validationMetrics = validationMetrics; } @@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> if (SegmentIntervalUtils.isValidInterval(timeInterval)) { segmentIntervals.add(timeInterval); } else { - numSegmentsWithInvalidIntervals ++; + numSegmentsWithInvalidIntervals++; } } if (numSegmentsWithInvalidIntervals > 0) { - LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName, - numSegmentsWithInvalidIntervals); + LOGGER + .warn("Table: {} has {} segments with invalid interval", offlineTableName, numSegmentsWithInvalidIntervals); } Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency()); numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 5eb5a6f..88ad642 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.HLCSegmentName; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; @@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private long _lastUpdateRealtimeDocumentCountTimeMs = 0L; public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, - PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics, - ControllerMetrics controllerMetrics) { + LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, + ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), - config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, controllerMetrics); + config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, + leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java index db23301..c78c756 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.DataSize; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; import org.slf4j.Logger; @@ -48,16 +48,16 @@ public class StorageQuotaChecker { private final TableConfig _tableConfig; private final ControllerMetrics _controllerMetrics; private final PinotHelixResourceManager _pinotHelixResourceManager; - private final ControllerLeadershipManager _controllerLeadershipManager; + private final LeadControllerManager _leadControllerManager; public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader, ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager, - ControllerLeadershipManager controllerLeadershipManager) { + LeadControllerManager leadControllerManager) { _tableConfig = tableConfig; _tableSizeReader = tableSizeReader; _controllerMetrics = controllerMetrics; _pinotHelixResourceManager = pinotHelixResourceManager; - _controllerLeadershipManager = controllerLeadershipManager; + _leadControllerManager = leadControllerManager; } public static class QuotaCheckerResponse { @@ -157,7 +157,7 @@ public class StorageQuotaChecker { tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes); // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L. - if (isLeader() && allowedStorageBytes != 0L) { + if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) { long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes; _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION, existingStorageQuotaUtilization); @@ -213,7 +213,7 @@ public class StorageQuotaChecker { } } - protected boolean isLeader() { - return _controllerLeadershipManager.isLeader(); + protected boolean isLeader(String tableName) { + return _leadControllerManager.isLeaderForTable(tableName); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java index d91e612..511c349 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java @@ -95,9 +95,9 @@ public class PinotControllerModeTest extends ControllerTest { _controllerStarter = null; } - // TODO: enable it after removing ControllerLeadershipManager which requires both CONTROLLER and PARTICIPANT + // TODO: enable it after removing HelixControllerLeadershipManager which requires both CONTROLLER and PARTICIPANT // HelixManager - @Test(enabled = false) + @Test public void testPinotOnlyController() throws Exception { config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); @@ -123,8 +123,13 @@ public class PinotControllerModeTest extends ControllerTest { TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS, "Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - // Enable the lead controller resource. - helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + try { + // Enable the lead controller resource. + helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + Assert.fail("Enabling resource before starting the 1st Pinot controller should fail."); + } catch (Exception e) { + // Expected. + } // Starting a pinot only controller. ControllerConf config3 = getDefaultControllerConfiguration(); @@ -141,6 +146,9 @@ public class PinotControllerModeTest extends ControllerTest { TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); Assert.assertEquals(firstPinotOnlyController.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY); + // Enable the lead controller resource. + helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + // Start a second Pinot only controller. ControllerConf config4 = getDefaultControllerConfiguration(); config4.setHelixClusterName(getHelixClusterName()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index c235483..066040e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.testng.Assert; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -46,6 +48,7 @@ import static org.mockito.Mockito.when; public class SegmentStatusCheckerTest { private SegmentStatusChecker segmentStatusChecker; private PinotHelixResourceManager helixResourceManager; + private LeadControllerManager leadControllerManager; private MetricsRegistry metricsRegistry; private ControllerMetrics controllerMetrics; private ControllerConf config; @@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals( @@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals( @@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals( @@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); @@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), @@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals( @@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); @@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); // verify state before test Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); // update metrics @@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); // verify state before test Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); // update metrics @@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest { when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); } + { + leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = + new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics); segmentStatusChecker.start(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index a928922..dcd5469 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -27,10 +27,12 @@ import java.util.stream.IntStream; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest { private final ControllerConf _controllerConf = new ControllerConf(); private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class); + private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class); private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry()); private final AtomicBoolean _startTaskCalled = new AtomicBoolean(); private final AtomicBoolean _stopTaskCalled = new AtomicBoolean(); @@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest { private static final String TASK_NAME = "TestTask"; private final ControllerPeriodicTask _task = new ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS, - _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) { + _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _leadControllerManager, + _controllerMetrics) { @Override protected void setUpTask() { @@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest { List<String> tables = new ArrayList<>(_numTables); IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE")); when(_resourceManager.getAllTables()).thenReturn(tables); + when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } private void resetState() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 04e2b7b..af35420 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -56,7 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; @@ -66,7 +66,6 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.core.realtime.stream.OffsetCriteria; import org.apache.pinot.core.realtime.stream.StreamConfig; -import org.apache.pinot.core.realtime.stream.StreamConfigProperties; import org.apache.pinot.core.segment.index.SegmentMetadataImpl; import org.apache.pinot.filesystem.PinotFSFactory; import org.apache.zookeeper.data.Stat; @@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest { protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, List<String> existingLLCSegments, ControllerMetrics controllerMetrics) { super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, - new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics)); + new LeadControllerManager()); try { TableConfigCache mockCache = mock(TableConfigCache.class); @@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - protected boolean isLeader() { + protected boolean isLeader(String tableName) { return IS_LEADER; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index b05f9bf..22e317f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.zookeeper.data.Stat; @@ -40,7 +40,6 @@ import org.testng.annotations.Test; import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus; import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -1151,8 +1150,7 @@ public class SegmentCompletionTest { protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) { - super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, - new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics)); + super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new LeadControllerManager()); } @Override @@ -1210,7 +1208,7 @@ public class SegmentCompletionTest { protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader, ControllerMetrics controllerMetrics) { super(helixManager, segmentManager, controllerMetrics, - new ControllerLeadershipManager(helixManager, controllerMetrics), + new LeadControllerManager(), SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds()); _isLeader = isLeader; } @@ -1221,7 +1219,7 @@ public class SegmentCompletionTest { } @Override - protected boolean isLeader() { + protected boolean isLeader(String tableName) { return _isLeader; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java index 8c5c8ec..9316062 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java @@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.pinot.common.config.RealtimeTagConfig; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest { private TestRealtimeSegmentRelocator _realtimeSegmentRelocator; private HelixManager _mockHelixManager; + private LeadControllerManager _leadControllerManager; private String[] serverNames; private String[] consumingServerNames; @@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest { PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class); _mockHelixManager = mock(HelixManager.class); when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager); + LeadControllerManager mockLeadControllerManager = mock(LeadControllerManager.class); + when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true); ControllerConf controllerConfig = new ControllerConf(); ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry()); _realtimeSegmentRelocator = - new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics); + new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, mockLeadControllerManager, controllerConfig, + controllerMetrics); final int maxInstances = 20; serverNames = new String[maxInstances]; @@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest { private Map<String, List<String>> tagToInstances; - public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, - ControllerMetrics controllerMetrics) { - super(pinotHelixResourceManager, config, controllerMetrics); + public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, + LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) { + super(pinotHelixResourceManager, leadControllerManager, config, controllerMetrics); tagToInstances = new HashedMap(); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index d4adfe1..465a9af 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; @@ -84,6 +85,9 @@ public class RetentionManagerTest { PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager); + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList); @@ -91,7 +95,8 @@ public class RetentionManagerTest { ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry()); conf.setRetentionControllerFrequencyInSeconds(0); conf.setDeletedSegmentsRetentionInDays(0); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics); + RetentionManager retentionManager = + new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics); retentionManager.start(); retentionManager.run(); @@ -210,11 +215,14 @@ public class RetentionManagerTest { setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager); + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + ControllerConf conf = new ControllerConf(); ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry()); conf.setRetentionControllerFrequencyInSeconds(0); conf.setDeletedSegmentsRetentionInDays(0); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics); + RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics); retentionManager.start(); retentionManager.run(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java index e84a947..f8fb6ca 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java @@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.ControllerLeadershipManager; +import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; import org.mockito.invocation.InvocationOnMock; @@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest { private TableConfig _tableConfig; private ControllerMetrics _controllerMetrics; private PinotHelixResourceManager _pinotHelixResourceManager; - private ControllerLeadershipManager _controllerLeadershipManager; + private LeadControllerManager _leadControllerManager; private QuotaConfig _quotaConfig; private SegmentsValidationAndRetentionConfig _validationConfig; private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName()); @@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest { _controllerMetrics = new ControllerMetrics(new MetricsRegistry()); _validationConfig = mock(SegmentsValidationAndRetentionConfig.class); _pinotHelixResourceManager = mock(PinotHelixResourceManager.class); - _controllerLeadershipManager = mock(ControllerLeadershipManager.class); + _leadControllerManager = mock(LeadControllerManager.class); when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig); when(_validationConfig.getReplicationNumber()).thenReturn(2); TEST_DIR.mkdirs(); @@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest { throws InvalidConfigException { StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager, - _controllerLeadershipManager); + _leadControllerManager); when(_tableConfig.getQuotaConfig()).thenReturn(null); - StorageQuotaChecker.QuotaCheckerResponse res = - checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000); + StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000); Assert.assertTrue(res.isSegmentWithinQuota); } @@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest { throws InvalidConfigException { StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager, - _controllerLeadershipManager); + _leadControllerManager); when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig); when(_quotaConfig.storageSizeBytes()).thenReturn(-1L); - StorageQuotaChecker.QuotaCheckerResponse res = - checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000); + StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000); Assert.assertTrue(res.isSegmentWithinQuota); } @@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest { when(_quotaConfig.getStorage()).thenReturn("3K"); StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager, - _controllerLeadershipManager); - StorageQuotaChecker.QuotaCheckerResponse response = - checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000); + _leadControllerManager); + StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000); Assert.assertTrue(response.isSegmentWithinQuota); Assert.assertEquals( _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L); @@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest { public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader, ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager, - ControllerLeadershipManager controllerLeadershipManager) { - super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager); + LeadControllerManager leadControllerManager) { + super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, leadControllerManager); } @Override - protected boolean isLeader() { + protected boolean isLeader(String tableName) { return true; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java index 07b0087..9217259 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java @@ -26,6 +26,7 @@ import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.model.ExternalView; +import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.query.utils.Pair; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -113,7 +114,8 @@ public class ControllerLeaderLocator { */ private String getLeaderForTable(String rawTableName) { String leaderForTable; - ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME); + ExternalView leadControllerResourceExternalView = + _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME); String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName); if (partitionLeader != null) { leaderForTable = partitionLeader; @@ -146,11 +148,12 @@ public class ControllerLeaderLocator { return null; } int numPartitions = partitionSet.size(); - int partitionIndex = rawTableName.hashCode() % numPartitions; + int partitionIndex = HelixHelper.getHashCodeForTable(rawTableName) % numPartitions; String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex; - Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName); + Map<String, String> partitionStateMap = leadControllerResourceExternalView.getStateMap(partitionName); - for (Map.Entry<String, String> entry : stateMap.entrySet()) { + // Get master host from partition map. Return null if no master found. + for (Map.Entry<String, String> entry : partitionStateMap.entrySet()) { if ("MASTER".equals(entry.getValue())) { return entry.getKey(); } @@ -166,9 +169,11 @@ public class ControllerLeaderLocator { BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor(); Stat stat = new Stat(); try { - ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST); + ZNRecord znRecord = + dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST); String helixLeader = znRecord.getId(); - LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime()); + LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), + stat.getMtime()); return helixLeader; } catch (Exception e) { LOGGER.warn("Could not locate Helix leader", e); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
