This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch split_vm_tasks_2 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f7b13ea1a9b70ce604dca9dcd78891ba7b4e9f57 Author: Neha Pawar <[email protected]> AuthorDate: Fri Jan 4 15:50:52 2019 -0800 Split ValidationManager duties into separate ControllerPeriodicTasks --- .../linkedin/pinot/controller/ControllerConf.java | 135 +++++++++++------ .../pinot/controller/ControllerStarter.java | 41 ++++-- .../realtime/PinotLLCRealtimeSegmentManager.java | 4 +- .../helix/core/retention/RetentionManager.java | 12 +- .../BrokerResourceValidationManager.java | 80 ++++++++++ ...ger.java => OfflineSegmentIntervalChecker.java} | 116 ++------------- .../RealtimeSegmentValidationManager.java | 162 +++++++++++++++++++++ .../helix/core/retention/RetentionManagerTest.java | 11 +- .../validation/ValidationManagerTest.java | 14 +- .../tests/SegmentCompletionIntegrationTests.java | 4 +- .../admin/command/StartControllerCommand.java | 4 +- 11 files changed, 406 insertions(+), 177 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java index df0c2df..1b496d9 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java @@ -54,15 +54,40 @@ public class ControllerConf extends PropertiesConfiguration { private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console"; private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps"; private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout"; - private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds"; - private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds"; - private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds"; - private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "controller.realtime.segment.relocator.frequency"; - private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds"; + + public static class ControllerPeriodicTasksConf { + private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds"; + private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = + "controller.offline.segment.interval.checker.frequencyInSeconds"; + private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = + "controller.realtime.segment.validation.frequencyInSeconds"; + private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = + "controller.broker.resource.validation.frequencyInSeconds"; + private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds"; + private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = + "controller.statuschecker.waitForPushTimeInSeconds"; + private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds"; + private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = + "controller.realtime.segment.relocator.frequency"; + // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a + // separate interval + private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = + "controller.segment.level.validation.intervalInSeconds"; + + private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. + private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. + private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. + private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. + private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes + private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes + private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled + private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour + private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60; + } + private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds"; private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds"; private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays"; - private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds"; private static final String TABLE_MIN_REPLICAS = "table.minReplicas"; private static final String ENABLE_SPLIT_COMMIT = "controller.enable.split.commit"; private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port"; @@ -73,25 +98,17 @@ public class ControllerConf extends PropertiesConfiguration { private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis"; private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks"; private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check"; - // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a - // separate interval - private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = - "controller.segment.level.validation.intervalInSeconds"; + private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode"; // Defines the kind of storage and the underlying PinotFS implementation private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class"; private static final String PINOT_FS_FACTORY_CLASS_LOCAL = "controller.storage.factory.class.file"; - private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. - private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. - private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes - private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour - private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes + private static final long DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30; private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7; - private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled private static final int DEFAULT_TABLE_MIN_REPLICAS = 1; private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false; private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000; @@ -101,7 +118,6 @@ public class ControllerConf extends PropertiesConfiguration { private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64; private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true; private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true; - private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60; private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName(); @@ -324,58 +340,91 @@ public class ControllerConf extends PropertiesConfiguration { } public int getRetentionControllerFrequencyInSeconds() { - if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) { - return Integer.parseInt((String) getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)); + if (containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) { + return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)); } - return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS; + return ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS; } public void setRetentionControllerFrequencyInSeconds(int retentionFrequencyInSeconds) { - setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(retentionFrequencyInSeconds)); + setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS, + Integer.toString(retentionFrequencyInSeconds)); + } + + public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() { + if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) { + return Integer.parseInt( + (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)); + } + return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS; + } + + public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) { + setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS, + Integer.toString(validationFrequencyInSeconds)); + } + + public int getRealtimeSegmentValidationFrequencyInSeconds() { + + if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) { + return Integer.parseInt( + (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)); + } + return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS; + } + + public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) { + setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS, + Integer.toString(validationFrequencyInSeconds)); } - public int getValidationControllerFrequencyInSeconds() { - if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) { - return Integer.parseInt((String) getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)); + public int getBrokerResourceValidationFrequencyInSeconds() { + if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) { + return Integer.parseInt( + (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)); } - return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS; + return ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS; } - public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) { - setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(validationFrequencyInSeconds)); + public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) { + setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS, + Integer.toString(validationFrequencyInSeconds)); } public int getStatusCheckerFrequencyInSeconds() { - if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) { - return Integer.parseInt((String) getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS)); + if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) { + return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)); } - return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS; + return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS; } public void setStatusCheckerFrequencyInSeconds(int statusCheckerFrequencyInSeconds) { - setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, Integer.toString(statusCheckerFrequencyInSeconds)); + setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS, + Integer.toString(statusCheckerFrequencyInSeconds)); } public String getRealtimeSegmentRelocatorFrequency() { - if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) { - return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY); + if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) { + return (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY); } - return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY; + return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY; } public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) { - setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency); + setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency); } public int getStatusCheckerWaitForPushTimeInSeconds() { - if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) { - return Integer.parseInt((String) getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)); + if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) { + return Integer.parseInt( + (String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)); } - return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS; + return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS; } public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPushTimeInSeconds) { - setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, Integer.toString(statusCheckerWaitForPushTimeInSeconds)); + setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, + Integer.toString(statusCheckerWaitForPushTimeInSeconds)); } public long getExternalViewOnlineToOfflineTimeout() { @@ -417,11 +466,12 @@ public class ControllerConf extends PropertiesConfiguration { } public int getTaskManagerFrequencyInSeconds() { - return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS); + return getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, + ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS); } public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) { - setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds)); + setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds)); } public int getDefaultTableMinReplicas() { @@ -469,6 +519,7 @@ public class ControllerConf extends PropertiesConfiguration { } public int getSegmentLevelValidationIntervalInSeconds() { - return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS); + return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, + ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS); } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java index 2799b44..07ca195 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java @@ -41,7 +41,9 @@ import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory; import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator; import com.linkedin.pinot.controller.helix.core.retention.RetentionManager; -import com.linkedin.pinot.controller.validation.ValidationManager; +import com.linkedin.pinot.controller.validation.BrokerResourceValidationManager; +import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker; +import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager; import com.linkedin.pinot.core.crypt.PinotCrypterFactory; import com.linkedin.pinot.core.periodictask.PeriodicTask; import com.linkedin.pinot.filesystem.PinotFSFactory; @@ -86,7 +88,9 @@ public class ControllerStarter { private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler; // Can only be constructed after resource manager getting started - private ValidationManager _validationManager; + private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker; + private RealtimeSegmentValidationManager _realtimeSegmentValidationManager; + private BrokerResourceValidationManager _brokerResourceValidationManager; private RealtimeSegmentRelocator _realtimeSegmentRelocator; private PinotHelixTaskResourceManager _helixTaskResourceManager; private PinotTaskManager _taskManager; @@ -95,8 +99,7 @@ public class ControllerStarter { _config = conf; _adminApp = new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps()); _helixResourceManager = new PinotHelixResourceManager(_config); - _retentionManager = new RetentionManager(_helixResourceManager, _config.getRetentionControllerFrequencyInSeconds(), - _config.getDeletedSegmentsRetentionInDays()); + _retentionManager = new RetentionManager(_helixResourceManager, _config); _metricsRegistry = new MetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager); @@ -111,8 +114,16 @@ public class ControllerStarter { return _helixResourceManager; } - public ValidationManager getValidationManager() { - return _validationManager; + public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() { + return _offlineSegmentIntervalChecker; + } + + public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() { + return _realtimeSegmentValidationManager; + } + + public BrokerResourceValidationManager getBrokerResourceValidationManager() { + return _brokerResourceValidationManager; } public PinotHelixTaskResourceManager getHelixTaskResourceManager() { @@ -181,10 +192,18 @@ public class ControllerStarter { _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics); periodicTasks.add(_taskManager); periodicTasks.add(_retentionManager); - _validationManager = - new ValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(), + _offlineSegmentIntervalChecker = + new OfflineSegmentIntervalChecker(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(), + new ValidationMetrics(_metricsRegistry)); + _realtimeSegmentValidationManager = + new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry)); - periodicTasks.add(_validationManager); + _brokerResourceValidationManager = + new BrokerResourceValidationManager(_config, _helixResourceManager); + + periodicTasks.add(_offlineSegmentIntervalChecker); + periodicTasks.add(_realtimeSegmentValidationManager); + periodicTasks.add(_brokerResourceValidationManager); periodicTasks.add(_segmentStatusChecker); periodicTasks.add(_realtimeSegmentRelocator); @@ -351,7 +370,9 @@ public class ControllerStarter { conf.setControllerVipHost("localhost"); conf.setControllerVipProtocol("http"); conf.setRetentionControllerFrequencyInSeconds(3600 * 6); - conf.setValidationControllerFrequencyInSeconds(3600); + conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600); + conf.setRealtimeSegmentValidationFrequencyInSeconds(3600); + conf.setBrokerResourceValidationFrequencyInSeconds(3600); conf.setStatusCheckerFrequencyInSeconds(5 * 60); conf.setRealtimeSegmentRelocatorFrequency("1h"); conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60); diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9860ec1..e708abc 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -818,8 +818,8 @@ public class PinotLLCRealtimeSegmentManager { /** * An instance is reporting that it has stopped consuming a topic due to some error. * Mark the state of the segment to be OFFLINE in idealstate. - * When all replicas of this segment are marked offline, the ValidationManager, in its next - * run, will auto-create a new segment with the appropriate offset. + * When all replicas of this segment are marked offline, the {@link com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager}, + * in its next run, will auto-create a new segment with the appropriate offset. */ public void segmentStoppedConsuming(final LLCSegmentName segmentName, final String instance) { String rawTableName = segmentName.getTableName(); diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java index ec15f28..d0e0b2e 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java @@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import com.linkedin.pinot.common.utils.CommonConstants; import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import com.linkedin.pinot.common.utils.SegmentName; +import com.linkedin.pinot.controller.ControllerConf; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy; @@ -52,10 +53,9 @@ public class RetentionManager extends ControllerPeriodicTask { private final int _deletedSegmentsRetentionInDays; - public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds, - int deletedSegmentsRetentionInDays) { - super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager); - _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; + public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) { + super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager); + _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays(); LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}", getIntervalInSeconds(), _deletedSegmentsRetentionInDays); @@ -148,7 +148,7 @@ public class RetentionManager extends ControllerPeriodicTask { // In progress segment, only check LLC segment if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) { // Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race - // with ValidationManager trying to auto-create the LLC segment + // with RealtimeSegmentValidationManager trying to auto-create the LLC segment if (shouldDeleteInProgressLLCSegment(segmentName, idealState, realtimeSegmentZKMetadata)) { segmentsToDelete.add(segmentName); } @@ -172,7 +172,7 @@ public class RetentionManager extends ControllerPeriodicTask { return false; } // delete a segment only if it is old enough (5 days) or else, - // 1. latest segment could get deleted in the middle of repair by ValidationManager + // 1. latest segment could get deleted in the middle of repair by RealtimeSegmentValidationManager // 2. for a brand new segment, if this code kicks in after new metadata is created but ideal state entry is not yet created (between step 2 and 3), // the latest segment metadata could get marked for deletion if (System.currentTimeMillis() - realtimeSegmentZKMetadata.getCreationTime() diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java new file mode 100644 index 0000000..e3264c4 --- /dev/null +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.controller.validation; + +import com.linkedin.pinot.common.config.TableConfig; +import com.linkedin.pinot.controller.ControllerConf; +import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; +import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import java.util.List; +import java.util.Set; +import org.apache.helix.model.InstanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Rebuilds the broker resource if the instance set has changed + */ +public class BrokerResourceValidationManager extends ControllerPeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class); + + private List<InstanceConfig> _instanceConfigs; + + public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) { + super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(), + pinotHelixResourceManager); + } + + @Override + protected void preprocess() { + // Cache instance configs to reduce ZK access + _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs(); + } + + @Override + protected void processTable(String tableNameWithType) { + try { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType); + return; + } + + // Rebuild broker resource + Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs, + tableConfig.getTenantConfig().getBroker()); + _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances); + } catch (Exception e) { + LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e); + } + } + + + @Override + protected void postprocess() { + + } + + @Override + protected void initTask() { + + } + + @Override + public void stopTask() { + } +} diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java similarity index 65% rename from pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java rename to pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java index c5528e7..e952437 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java @@ -19,29 +19,20 @@ package com.linkedin.pinot.controller.validation; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig; import com.linkedin.pinot.common.config.TableConfig; import com.linkedin.pinot.common.config.TableNameBuilder; import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata; -import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import com.linkedin.pinot.common.metrics.ValidationMetrics; import com.linkedin.pinot.common.utils.CommonConstants; -import com.linkedin.pinot.common.utils.HLCSegmentName; -import com.linkedin.pinot.common.utils.SegmentName; import com.linkedin.pinot.common.utils.time.TimeUtils; import com.linkedin.pinot.controller.ControllerConf; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; -import com.linkedin.pinot.core.realtime.stream.StreamConfig; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.helix.model.InstanceConfig; import org.joda.time.Duration; import org.joda.time.Interval; import org.slf4j.Logger; @@ -52,53 +43,26 @@ import org.slf4j.LoggerFactory; * Manages the segment validation metrics, to ensure that all offline segments are contiguous (no missing segments) and * that the offline push delay isn't too high. */ -public class ValidationManager extends ControllerPeriodicTask { - private static final Logger LOGGER = LoggerFactory.getLogger(ValidationManager.class); +public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class); - private final int _segmentLevelValidationIntervalInSeconds; - private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; - private final ValidationMetrics _validationMetrics; + protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; + protected final ValidationMetrics _validationMetrics; - private long _lastSegmentLevelValidationTimeMs = 0L; - private boolean _runSegmentLevelValidation; - private List<InstanceConfig> _instanceConfigs; - - public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, + public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) { - super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager); - _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); - Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); + super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(), + pinotHelixResourceManager); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; } @Override protected void preprocess() { - // Run segment level validation using a separate interval - _runSegmentLevelValidation = false; - long currentTimeMs = System.currentTimeMillis(); - if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs) - >= _segmentLevelValidationIntervalInSeconds) { - LOGGER.info("Run segment-level validation"); - _runSegmentLevelValidation = true; - _lastSegmentLevelValidationTimeMs = currentTimeMs; - } - - // Cache instance configs to reduce ZK access - _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs(); } @Override protected void processTable(String tableNameWithType) { - runValidation(tableNameWithType); - } - - @Override - protected void postprocess() { - - } - - private void runValidation(String tableNameWithType) { try { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); if (tableConfig == null) { @@ -106,29 +70,12 @@ public class ValidationManager extends ControllerPeriodicTask { return; } - // Rebuild broker resource - Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs, - tableConfig.getTenantConfig().getBroker()); - _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances); - - // Perform validation based on the table type CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); if (tableType == CommonConstants.Helix.TableType.OFFLINE) { - if (_runSegmentLevelValidation) { - validateOfflineSegmentPush(tableConfig); - } - } else { - if (_runSegmentLevelValidation) { - updateRealtimeDocumentCount(tableConfig); - } - Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); - StreamConfig streamConfig = new StreamConfig(streamConfigMap); - if (streamConfig.hasLowLevelConsumerType()) { - _llcRealtimeSegmentManager.validateLLCSegments(tableConfig); - } + validateOfflineSegmentPush(tableConfig); } } catch (Exception e) { - LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e); + LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e); } } @@ -264,50 +211,9 @@ public class ValidationManager extends ControllerPeriodicTask { return numTotalDocs; } - private void updateRealtimeDocumentCount(TableConfig tableConfig) { - String realtimeTableName = tableConfig.getTableName(); - List<RealtimeSegmentZKMetadata> metadataList = - _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName); - boolean countHLCSegments = true; // false if this table has ONLY LLC segments (i.e. fully migrated) - StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); - if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) { - countHLCSegments = false; - } - // Update the gauge to contain the total document count in the segments - _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(), - computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments)); - } - - @VisibleForTesting - static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList, - boolean countHLCSegments) { - long numTotalDocs = 0; - - String groupId = ""; - for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) { - String segmentName = realtimeSegmentZKMetadata.getSegmentName(); - if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) { - if (countHLCSegments) { - HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName); - String segmentGroupIdName = hlcSegmentName.getGroupId(); - - if (groupId.isEmpty()) { - groupId = segmentGroupIdName; - } - // Discard all segments with different groupids as they are replicas - if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) { - numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs(); - } - } - } else { - // Low level segments - if (!countHLCSegments) { - numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs(); - } - } - } + @Override + protected void postprocess() { - return numTotalDocs; } @Override diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java new file mode 100644 index 0000000..71e50d2 --- /dev/null +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.controller.validation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.linkedin.pinot.common.config.TableConfig; +import com.linkedin.pinot.common.config.TableNameBuilder; +import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; +import com.linkedin.pinot.common.metrics.ValidationMetrics; +import com.linkedin.pinot.common.utils.CommonConstants; +import com.linkedin.pinot.common.utils.HLCSegmentName; +import com.linkedin.pinot.common.utils.SegmentName; +import com.linkedin.pinot.controller.ControllerConf; +import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; +import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; +import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import com.linkedin.pinot.core.realtime.stream.StreamConfig; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming + */ +public class RealtimeSegmentValidationManager extends ControllerPeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class); + + protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; + protected final ValidationMetrics _validationMetrics; + + private final int _segmentLevelValidationIntervalInSeconds; + private long _lastUpdateRealtimeDocumentCountTimeMs = 0L; + private boolean _updateRealtimeDocumentCount; + + public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, + PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) { + super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), + pinotHelixResourceManager); + _llcRealtimeSegmentManager = llcRealtimeSegmentManager; + _validationMetrics = validationMetrics; + + _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); + Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); + } + + @Override + protected void preprocess() { + // Update realtime document counts only if certain time has passed after previous run + _updateRealtimeDocumentCount = false; + long currentTimeMs = System.currentTimeMillis(); + if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs) + >= _segmentLevelValidationIntervalInSeconds) { + LOGGER.info("Run segment-level validation"); + _updateRealtimeDocumentCount = true; + _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs; + } + } + + @Override + protected void processTable(String tableNameWithType) { + try { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType); + return; + } + + CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == CommonConstants.Helix.TableType.REALTIME) { + if (_updateRealtimeDocumentCount) { + updateRealtimeDocumentCount(tableConfig); + } + } + Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); + StreamConfig streamConfig = new StreamConfig(streamConfigMap); + if (streamConfig.hasLowLevelConsumerType()) { + _llcRealtimeSegmentManager.validateLLCSegments(tableConfig); + } + } catch (Exception e) { + LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e); + } + } + + private void updateRealtimeDocumentCount(TableConfig tableConfig) { + String realtimeTableName = tableConfig.getTableName(); + List<RealtimeSegmentZKMetadata> metadataList = + _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName); + boolean countHLCSegments = true; // false if this table has ONLY LLC segments (i.e. fully migrated) + StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); + if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) { + countHLCSegments = false; + } + // Update the gauge to contain the total document count in the segments + _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(), + computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments)); + } + + @VisibleForTesting + static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList, + boolean countHLCSegments) { + long numTotalDocs = 0; + + String groupId = ""; + for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) { + String segmentName = realtimeSegmentZKMetadata.getSegmentName(); + if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) { + if (countHLCSegments) { + HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName); + String segmentGroupIdName = hlcSegmentName.getGroupId(); + + if (groupId.isEmpty()) { + groupId = segmentGroupIdName; + } + // Discard all segments with different groupids as they are replicas + if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) { + numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs(); + } + } + } else { + // Low level segments + if (!countHLCSegments) { + numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs(); + } + } + } + + return numTotalDocs; + } + + @Override + protected void postprocess() { + + } + + @Override + protected void initTask() { + + } + + @Override + public void stopTask() { + LOGGER.info("Unregister all the validation metrics."); + _validationMetrics.unregisterAllMetrics(); + } +} diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java index 3e60350..d2fe9b3 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; import com.linkedin.pinot.common.segment.SegmentMetadata; import com.linkedin.pinot.common.utils.CommonConstants; import com.linkedin.pinot.common.utils.LLCSegmentName; +import com.linkedin.pinot.controller.ControllerConf; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder; import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager; @@ -87,7 +88,10 @@ public class RetentionManagerTest { when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); + ControllerConf conf = new ControllerConf(); + conf.setRetentionControllerFrequencyInSeconds(0); + conf.setDeletedSegmentsRetentionInDays(0); + RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf); retentionManager.init(); retentionManager.run(); @@ -204,7 +208,10 @@ public class RetentionManagerTest { setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); + ControllerConf conf = new ControllerConf(); + conf.setRetentionControllerFrequencyInSeconds(0); + conf.setDeletedSegmentsRetentionInDays(0); + RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf); retentionManager.init(); retentionManager.run(); diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java index 0526afb..f69657b 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java @@ -52,7 +52,7 @@ import static org.testng.Assert.*; /** - * Tests for the ValidationManager. + * Tests for the ValidationManagers. */ public class ValidationManagerTest { private String HELIX_CLUSTER_NAME = "TestValidationManager"; @@ -178,7 +178,7 @@ public class ValidationManagerTest { segmentZKMetadataList.add( SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName4, 20)); - assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60); + assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60); // Now add some low level segment names String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 1000).getSegmentName(); @@ -188,7 +188,7 @@ public class ValidationManagerTest { segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName6, 5)); // Only the LLC segments should get counted. - assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15); + assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15); } @AfterClass @@ -210,18 +210,18 @@ public class ValidationManagerTest { jan1st2nd3rd.add(jan1st); jan1st2nd3rd.add(jan2nd); jan1st2nd3rd.add(jan3rd); - assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0); + assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0); ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd); jan1st2nd3rd5th.add(jan5th); - assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1); + assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1); // Should also work if the intervals are in random order ArrayList<Interval> jan5th2nd1st = new ArrayList<>(); jan5th2nd1st.add(jan5th); jan5th2nd1st.add(jan2nd); jan5th2nd1st.add(jan1st); - assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2); + assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2); // Should also work if the intervals are of different sizes Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new DateTime(2015, 1, 2, 23, 59, 59)); @@ -229,6 +229,6 @@ public class ValidationManagerTest { jan1st2nd4th5th.add(jan1stAnd2nd); jan1st2nd4th5th.add(jan4th); jan1st2nd4th5th.add(jan5th); - assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1); + assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1); } } diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java index 28eb463..5ef1278 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java @@ -28,7 +28,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName; import com.linkedin.pinot.common.utils.NetUtil; import com.linkedin.pinot.common.utils.ZkStarter; import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; -import com.linkedin.pinot.controller.validation.ValidationManager; +import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager; import com.linkedin.pinot.server.realtime.ControllerLeaderLocator; import com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory; @@ -155,7 +155,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra final String oldSegment = _currentSegment; // Now call the validation manager, and the segment should fix itself - ValidationManager validationManager = _controllerStarter.getValidationManager(); + RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager(); validationManager.init(); validationManager.run(); diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java index 49c56b1..aa82182 100644 --- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java +++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java @@ -150,7 +150,9 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements conf.setTenantIsolationEnabled(_tenantIsolation); conf.setRetentionControllerFrequencyInSeconds(3600 * 6); - conf.setValidationControllerFrequencyInSeconds(3600); + conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600); + conf.setRealtimeSegmentValidationFrequencyInSeconds(3600); + conf.setBrokerResourceValidationFrequencyInSeconds(3600); } LOGGER.info("Executing command: " + toString()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
