This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 493ed64  Split validation manager tasks into separate periodic tasks 
(#3668)
493ed64 is described below

commit 493ed643e1e5a852cf2549cfe76ef4b3f1e7e0d4
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Jan 8 17:37:12 2019 -0800

    Split validation manager tasks into separate periodic tasks (#3668)
    
    Split ValidationManager duties into separate ControllerPeriodicTasks viz. 
OfflineSegmentIntervalChecker (to perform checks related to offline segments), 
RealtimeSegmentValidationManager (to perform validation of realtime consuming 
partitions), BrokerResourceValidationManager (to perform validation of broker 
resource). These have been split out as they need to be run in different 
frequencies than each other.
---
 .../linkedin/pinot/controller/ControllerConf.java  | 161 ++++++++++++++------
 .../pinot/controller/ControllerStarter.java        |  40 +++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  10 +-
 .../helix/core/retention/RetentionManager.java     |  12 +-
 .../BrokerResourceValidationManager.java           |  82 ++++++++++
 ...ger.java => OfflineSegmentIntervalChecker.java} | 127 ++--------------
 .../RealtimeSegmentValidationManager.java          | 167 +++++++++++++++++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |  26 ++--
 .../helix/core/retention/RetentionManagerTest.java |  11 +-
 .../validation/ValidationManagerTest.java          |  14 +-
 .../tests/SegmentCompletionIntegrationTests.java   |   4 +-
 .../admin/command/StartControllerCommand.java      |   4 +-
 12 files changed, 459 insertions(+), 199 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index df0c2df..863f3cc 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -54,15 +54,43 @@ public class ControllerConf extends PropertiesConfiguration 
{
   private static final String CONSOLE_WEBAPP_ROOT_PATH = 
"controller.query.console";
   private static final String CONSOLE_WEBAPP_USE_HTTPS = 
"controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = 
"controller.upload.onlineToOfflineTimeout";
-  private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 
"controller.retention.frequencyInSeconds";
-  private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = 
"controller.validation.frequencyInSeconds";
-  private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = 
"controller.statuschecker.frequencyInSeconds";
-  private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = 
"controller.realtime.segment.relocator.frequency";
-  private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 
"controller.statuschecker.waitForPushTimeInSeconds";
+
+  public static class ControllerPeriodicTasksConf {
+    private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 
"controller.retention.frequencyInSeconds";
+    @Deprecated // The ValidationManager has been split up into 3 separate 
tasks, each having their own frequency config settings
+    private static final String 
DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+        "controller.validation.frequencyInSeconds";
+    private static final String 
OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+        "controller.offline.segment.interval.checker.frequencyInSeconds";
+    private static final String 
REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.realtime.segment.validation.frequencyInSeconds";
+    private static final String 
BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.broker.resource.validation.frequencyInSeconds";
+    private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = 
"controller.statuschecker.frequencyInSeconds";
+    private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+        "controller.statuschecker.waitForPushTimeInSeconds";
+    private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = 
"controller.task.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+        "controller.realtime.segment.relocator.frequency";
+    // Because segment level validation is expensive and requires heavy ZK 
access, we run segment level validation with a
+    // separate interval
+    private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+        "controller.segment.level.validation.intervalInSeconds";
+
+    private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS 
= 6 * 60 * 60; // 6 Hours.
+    private static final int 
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; 
// 24 Hours.
+    private static final int 
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int 
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 
5 * 60; // 5 minutes
+    private static final int 
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+    private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; 
// Disabled
+    private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = 
"1h"; // 1 hour
+    private static final int 
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+  }
+
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 
"server.request.timeoutSeconds";
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"controller.realtime.segment.commit.timeoutSeconds";
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = 
"controller.deleted.segments.retentionInDays";
-  private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = 
"controller.task.frequencyInSeconds";
   private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   private static final String ENABLE_SPLIT_COMMIT = 
"controller.enable.split.commit";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -73,25 +101,17 @@ public class ControllerConf extends 
PropertiesConfiguration {
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 
"controller.segment.upload.timeoutInMillis";
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
"controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = 
"controller.enable.storage.quota.check";
-  // Because segment level validation is expensive and requires heavy ZK 
access, we run segment level validation with a
-  // separate interval
-  private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
-      "controller.segment.level.validation.intervalInSeconds";
+
   private static final String ENABLE_BATCH_MESSAGE_MODE = 
"controller.enable.batch.message.mode";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = 
"controller.storage.factory.class";
   private static final String PINOT_FS_FACTORY_CLASS_LOCAL = 
"controller.storage.factory.class.file";
 
-  private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 
6 * 60 * 60; // 6 Hours.
-  private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS 
= 60 * 60; // 1 Hour.
-  private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 
* 60; // 5 minutes
-  private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = 
"1h"; // 1 hour
-  private static final int 
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
   private static final long 
DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
   private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
-  private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // 
Disabled
   private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
   private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
   private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -101,7 +121,6 @@ public class ControllerConf extends PropertiesConfiguration 
{
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
-  private static final int 
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = 
LocalPinotFS.class.getName();
 
@@ -324,58 +343,114 @@ public class ControllerConf extends 
PropertiesConfiguration {
   }
 
   public int getRetentionControllerFrequencyInSeconds() {
-    if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) 
getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+    if 
(containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS))
 {
+      return Integer.parseInt((String) 
getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return 
ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setRetentionControllerFrequencyInSeconds(int 
retentionFrequencyInSeconds) {
-    setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, 
Integer.toString(retentionFrequencyInSeconds));
+    
setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+        Integer.toString(retentionFrequencyInSeconds));
   }
 
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) 
getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+  /**
+   * Returns the config value for 
controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the segment level validation interval. This 
is done in order to retain the current behavior,
+   * wherein the offline validation tasks were done at segment level 
validation interval frequency
+   * The default value is the new 
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+    if 
(containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS))
 {
+      return Integer.parseInt(
+          (String) 
getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+    }
+    return 
getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        
ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int 
validationFrequencyInSeconds) {
+    
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  /**
+   * Returns the config value for 
controller.realtime.segment.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is 
done in order to retain the current behavior,
+   * wherein the realtime validation tasks were done at validation controller 
frequency
+   * The default value is the new 
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getRealtimeSegmentValidationFrequencyInSeconds() {
+    if 
(containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS))
 {
+      return Integer.parseInt(
+          (String) 
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+    }
+    return 
getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        
ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setRealtimeSegmentValidationFrequencyInSeconds(int 
validationFrequencyInSeconds) {
+    
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  /**
+   * Returns the config value for  
controller.broker.resource.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is 
done in order to retain the current behavior,
+   * wherein the broker resource validation tasks were done at validation 
controller frequency
+   * The default value is the new 
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getBrokerResourceValidationFrequencyInSeconds() {
+    if 
(containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS))
 {
+      return Integer.parseInt(
+          (String) 
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return 
getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        
ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS);
   }
 
-  public void setValidationControllerFrequencyInSeconds(int 
validationFrequencyInSeconds) {
-    setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, 
Integer.toString(validationFrequencyInSeconds));
+  public void setBrokerResourceValidationFrequencyInSeconds(int 
validationFrequencyInSeconds) {
+    
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
   public int getStatusCheckerFrequencyInSeconds() {
-    if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) 
getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+    if 
(containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) 
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return 
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setStatusCheckerFrequencyInSeconds(int 
statusCheckerFrequencyInSeconds) {
-    setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, 
Integer.toString(statusCheckerFrequencyInSeconds));
+    
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
   public String getRealtimeSegmentRelocatorFrequency() {
-    if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
-      return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+    if 
(containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) 
{
+      return (String) 
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
     }
-    return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+    return 
ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
   }
 
   public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
-    setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+    
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, 
relocatorFrequency);
   }
 
   public int getStatusCheckerWaitForPushTimeInSeconds() {
-    if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
-      return Integer.parseInt((String) 
getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+    if 
(containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS))
 {
+      return Integer.parseInt(
+          (String) 
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+    return 
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
   }
 
   public void setStatusCheckerWaitForPushTimeInSeconds(int 
statusCheckerWaitForPushTimeInSeconds) {
-    setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, 
Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+    
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+        Integer.toString(statusCheckerWaitForPushTimeInSeconds));
   }
 
   public long getExternalViewOnlineToOfflineTimeout() {
@@ -417,11 +492,12 @@ public class ControllerConf extends 
PropertiesConfiguration {
   }
 
   public int getTaskManagerFrequencyInSeconds() {
-    return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, 
DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+    return 
getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
   }
 
   public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
-    setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, 
Integer.toString(frequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, 
Integer.toString(frequencyInSeconds));
   }
 
   public int getDefaultTableMinReplicas() {
@@ -469,6 +545,7 @@ public class ControllerConf extends PropertiesConfiguration 
{
   }
 
   public int getSegmentLevelValidationIntervalInSeconds() {
-    return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, 
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+    return 
getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        
ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 2799b44..2bca8ae 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -41,7 +41,9 @@ import 
com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
 import 
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import 
com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import 
com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import 
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -86,7 +88,9 @@ public class ControllerStarter {
   private final ControllerPeriodicTaskScheduler 
_controllerPeriodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
-  private ValidationManager _validationManager;
+  private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+  private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+  private BrokerResourceValidationManager _brokerResourceValidationManager;
   private RealtimeSegmentRelocator _realtimeSegmentRelocator;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotTaskManager _taskManager;
@@ -95,8 +99,7 @@ public class ControllerStarter {
     _config = conf;
     _adminApp = new 
ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), 
_config.getQueryConsoleUseHttps());
     _helixResourceManager = new PinotHelixResourceManager(_config);
-    _retentionManager = new RetentionManager(_helixResourceManager, 
_config.getRetentionControllerFrequencyInSeconds(),
-        _config.getDeletedSegmentsRetentionInDays());
+    _retentionManager = new RetentionManager(_helixResourceManager, _config);
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _realtimeSegmentsManager = new 
PinotRealtimeSegmentManager(_helixResourceManager);
@@ -111,8 +114,16 @@ public class ControllerStarter {
     return _helixResourceManager;
   }
 
-  public ValidationManager getValidationManager() {
-    return _validationManager;
+  public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+    return _offlineSegmentIntervalChecker;
+  }
+
+  public RealtimeSegmentValidationManager 
getRealtimeSegmentValidationManager() {
+    return _realtimeSegmentValidationManager;
+  }
+
+  public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+    return _brokerResourceValidationManager;
   }
 
   public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -181,10 +192,17 @@ public class ControllerStarter {
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, 
_helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
     periodicTasks.add(_retentionManager);
-    _validationManager =
-        new ValidationManager(_config, _helixResourceManager, 
PinotLLCRealtimeSegmentManager.getInstance(),
+    _offlineSegmentIntervalChecker =
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new 
ValidationMetrics(_metricsRegistry));
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
PinotLLCRealtimeSegmentManager.getInstance(),
             new ValidationMetrics(_metricsRegistry));
-    periodicTasks.add(_validationManager);
+    _brokerResourceValidationManager =
+        new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+    periodicTasks.add(_offlineSegmentIntervalChecker);
+    periodicTasks.add(_realtimeSegmentValidationManager);
+    periodicTasks.add(_brokerResourceValidationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
 
@@ -351,7 +369,9 @@ public class ControllerStarter {
     conf.setControllerVipHost("localhost");
     conf.setControllerVipProtocol("http");
     conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-    conf.setValidationControllerFrequencyInSeconds(3600);
+    conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+    conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+    conf.setBrokerResourceValidationFrequencyInSeconds(3600);
     conf.setStatusCheckerFrequencyInSeconds(5 * 60);
     conf.setRealtimeSegmentRelocatorFrequency("1h");
     conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9860ec1..a77750e 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -818,8 +818,8 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * An instance is reporting that it has stopped consuming a topic due to 
some error.
    * Mark the state of the segment to be OFFLINE in idealstate.
-   * When all replicas of this segment are marked offline, the 
ValidationManager, in its next
-   * run, will auto-create a new segment with the appropriate offset.
+   * When all replicas of this segment are marked offline, the {@link 
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+   * in its next run, will auto-create a new segment with the appropriate 
offset.
    */
   public void segmentStoppedConsuming(final LLCSegmentName segmentName, final 
String instance) {
     String rawTableName = segmentName.getTableName();
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
    *
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and reset it to 0 at the end of 
validateLLC
    */
-  public void validateLLCSegments(final TableConfig tableConfig) {
+  public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
     final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
-        return validateLLCSegments(tableConfig, idealState, partitionCount);
+        return ensureAllPartitionsConsuming(tableConfig, idealState, 
partitionCount);
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
   }
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: split this method into multiple smaller methods
    */
   @VisibleForTesting
-  protected IdealState validateLLCSegments(final TableConfig tableConfig, 
IdealState idealState,
+  protected IdealState ensureAllPartitionsConsuming(final TableConfig 
tableConfig, IdealState idealState,
       final int partitionCount) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index f177a80..726c481 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -26,6 +26,7 @@ import 
com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -52,10 +53,9 @@ public class RetentionManager extends ControllerPeriodicTask 
{
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, 
int runFrequencyInSeconds,
-      int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, 
pinotHelixResourceManager);
-    _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, 
ControllerConf config) {
+    super("RetentionManager", 
config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+    _deletedSegmentsRetentionInDays = 
config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, 
deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -148,7 +148,7 @@ public class RetentionManager extends 
ControllerPeriodicTask {
         // In progress segment, only check LLC segment
         if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
           // Delete old LLC segment that hangs around. Do not delete segment 
that are current since there may be a race
-          // with ValidationManager trying to auto-create the LLC segment
+          // with RealtimeSegmentValidationManager trying to auto-create the 
LLC segment
           if (shouldDeleteInProgressLLCSegment(segmentName, idealState, 
realtimeSegmentZKMetadata)) {
             segmentsToDelete.add(segmentName);
           }
@@ -172,7 +172,7 @@ public class RetentionManager extends 
ControllerPeriodicTask {
       return false;
     }
     // delete a segment only if it is old enough (5 days) or else,
-    // 1. latest segment could get deleted in the middle of repair by 
ValidationManager
+    // 1. latest segment could get deleted in the middle of repair by 
RealtimeSegmentValidationManager
     // 2. for a brand new segment, if this code kicks in after new metadata is 
created but ideal state entry is not yet created (between step 2 and 3),
     // the latest segment metadata could get marked for deletion
     if (System.currentTimeMillis() - 
realtimeSegmentZKMetadata.getCreationTime()
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..40078d6
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+  private List<InstanceConfig> _instanceConfigs;
+
+  public BrokerResourceValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager) {
+    super("BrokerResourceValidationManager", 
config.getBrokerResourceValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+  }
+
+  @Override
+  protected void preprocess() {
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping 
broker resource validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = 
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating broker resource for 
table: {}", tableNameWithType, e);
+    }
+  }
+
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+  }
+}
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 62%
rename from 
pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to 
pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index c5528e7..3c79205 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -19,29 +19,19 @@
 package com.linkedin.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
 import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
-import 
com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.slf4j.Logger;
@@ -52,83 +42,39 @@ import org.slf4j.LoggerFactory;
  * Manages the segment validation metrics, to ensure that all offline segments 
are contiguous (no missing segments) and
  * that the offline push delay isn't too high.
  */
-public class ValidationManager extends ControllerPeriodicTask {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  private final int _segmentLevelValidationIntervalInSeconds;
-  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
 
-  private long _lastSegmentLevelValidationTimeMs = 0L;
-  private boolean _runSegmentLevelValidation;
-  private List<InstanceConfig> _instanceConfigs;
-
-  public ValidationManager(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics) {
-    super("ValidationManager", 
config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
-    _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
-    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
-    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+  public OfflineSegmentIntervalChecker(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      ValidationMetrics validationMetrics) {
+    super("OfflineSegmentIntervalChecker", 
config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+        pinotHelixResourceManager);
     _validationMetrics = validationMetrics;
   }
 
   @Override
   protected void preprocess() {
-    // Run segment level validation using a separate interval
-    _runSegmentLevelValidation = false;
-    long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastSegmentLevelValidationTimeMs)
-        >= _segmentLevelValidationIntervalInSeconds) {
-      LOGGER.info("Run segment-level validation");
-      _runSegmentLevelValidation = true;
-      _lastSegmentLevelValidationTimeMs = currentTimeMs;
-    }
-
-    // Cache instance configs to reduce ZK access
-    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    runValidation(tableNameWithType);
-  }
-
-  @Override
-  protected void postprocess() {
-
-  }
-
-  private void runValidation(String tableNameWithType) {
     try {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
-        return;
-      }
-
-      // Rebuild broker resource
-      Set<String> brokerInstances = 
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
-          tableConfig.getTenantConfig().getBroker());
-      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, 
brokerInstances);
 
-      // Perform validation based on the table type
       CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-        if (_runSegmentLevelValidation) {
-          validateOfflineSegmentPush(tableConfig);
-        }
-      } else {
-        if (_runSegmentLevelValidation) {
-          updateRealtimeDocumentCount(tableConfig);
-        }
-        Map<String, String> streamConfigMap = 
tableConfig.getIndexingConfig().getStreamConfigs();
-        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-        if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+
+        TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
+          return;
         }
+
+        validateOfflineSegmentPush(tableConfig);
       }
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating table: {}", 
tableNameWithType, e);
+      LOGGER.warn("Caught exception while checking offline segment intervals 
for table: {}", tableNameWithType, e);
     }
   }
 
@@ -264,50 +210,9 @@ public class ValidationManager extends 
ControllerPeriodicTask {
     return numTotalDocs;
   }
 
-  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
-    String realtimeTableName = tableConfig.getTableName();
-    List<RealtimeSegmentZKMetadata> metadataList =
-        
_pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC 
segments (i.e. fully migrated)
-    StreamConfig streamConfig = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
-    if (streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(metadataList, 
countHLCSegments));
-  }
-
-  @VisibleForTesting
-  static long 
computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> 
realtimeSegmentZKMetadataList,
-      boolean countHLCSegments) {
-    long numTotalDocs = 0;
-
-    String groupId = "";
-    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
realtimeSegmentZKMetadataList) {
-      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
-        if (countHLCSegments) {
-          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-          String segmentGroupIdName = hlcSegmentName.getGroupId();
-
-          if (groupId.isEmpty()) {
-            groupId = segmentGroupIdName;
-          }
-          // Discard all segments with different groupids as they are replicas
-          if (groupId.equals(segmentGroupIdName) && 
realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
-            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-          }
-        }
-      } else {
-        // Low level segments
-        if (!countHLCSegments) {
-          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-        }
-      }
-    }
+  @Override
+  protected void postprocess() {
 
-    return numTotalDocs;
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..3274df3
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import 
com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions 
which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final ValidationMetrics _validationMetrics;
+
+  private final int _segmentLevelValidationIntervalInSeconds;
+  private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+  private boolean _updateRealtimeDocumentCount;
+
+  public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
+      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics) {
+    super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _validationMetrics = validationMetrics;
+
+    _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
+    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Update realtime document counts only if certain time has passed after 
previous run
+    _updateRealtimeDocumentCount = false;
+    long currentTimeMs = System.currentTimeMillis();
+    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - 
_lastUpdateRealtimeDocumentCountTimeMs)
+        >= _segmentLevelValidationIntervalInSeconds) {
+      LOGGER.info("Run segment-level validation");
+      _updateRealtimeDocumentCount = true;
+      _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+    }
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      CommonConstants.Helix.TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+
+        TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
+          return;
+        }
+
+        if (_updateRealtimeDocumentCount) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+
+        Map<String, String> streamConfigMap = 
tableConfig.getIndexingConfig().getStreamConfigs();
+        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+        if (streamConfig.hasLowLevelConsumerType()) {
+          _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating realtime table: {}", 
tableNameWithType, e);
+    }
+  }
+
+  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    List<RealtimeSegmentZKMetadata> metadataList =
+        
_pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+    boolean countHLCSegments = true;  // false if this table has ONLY LLC 
segments (i.e. fully migrated)
+    StreamConfig streamConfig = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+    if (streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType()) {
+      countHLCSegments = false;
+    }
+    // Update the gauge to contain the total document count in the segments
+    
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+        computeRealtimeTotalDocumentInSegments(metadataList, 
countHLCSegments));
+  }
+
+  @VisibleForTesting
+  static long 
computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> 
realtimeSegmentZKMetadataList,
+      boolean countHLCSegments) {
+    long numTotalDocs = 0;
+
+    String groupId = "";
+    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : 
realtimeSegmentZKMetadataList) {
+      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+        if (countHLCSegments) {
+          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+          String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+          if (groupId.isEmpty()) {
+            groupId = segmentGroupIdName;
+          }
+          // Discard all segments with different groupids as they are replicas
+          if (groupId.equals(segmentGroupIdName) && 
realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+          }
+        }
+      } else {
+        // Low level segments
+        if (!countHLCSegments) {
+          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+        }
+      }
+    }
+
+    return numTotalDocs;
+  }
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       oldMetadataMap.put(entry.getKey(), new 
LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
     }
     
segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
-    IdealState updatedIdealState = 
segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    IdealState updatedIdealState = 
segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
     Map<String, Map<String, String>> updatedMapFields = 
updatedIdealState.getRecord().getMapFields();
     Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = 
segmentManager._metadataMap;
 
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
             // validate that all entries in oldMapFields are unchanged in new 
ideal state
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING
           
Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
             // validate nothing changed and try again with disabled
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
 
           // verify that new segment gets created in ideal state with 
CONSUMING and old segment ONLINE
           
Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
               (ZNRecord) 
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
           Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
 
           verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, 
segmentManager, oldMapFields);
         } else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
             Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
             if (tooSoonToCorrect) {
-              segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
               // validate nothing changed and try again with disabled
               verifyNoChangeToOldEntries(oldMapFields, idealState);
               segmentManager.tooSoonToCorrect = false;
             }
-            segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
 
             verifyRepairs(tableConfig, idealState, 
expectedPartitionAssignment, segmentManager, oldMapFields);
           } else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
               Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
               if (tooSoonToCorrect) {
-                segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+                segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
                 // validate nothing changed and try again with disabled
                 verifyNoChangeToOldEntries(oldMapFields, idealState);
                 segmentManager.tooSoonToCorrect = false;
               }
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
 
               verifyRepairs(tableConfig, idealState, 
expectedPartitionAssignment, segmentManager, oldMapFields);
             } else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   (ZNRecord) 
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
               Map<String, Map<String, String>> oldMapFields = 
idealStateCopy.getRecord().getMapFields();
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, 
nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, 
idealState, nPartitions);
 
               // verify that nothing changed
               verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
 
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
     PartitionAssignment partitionAssignment =
         
segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
             idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig 
tableConfig, int nPartitions) {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, 
nPartitions);
     return idealStateBuilder.build();
   }
 
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 3e60350..d2fe9b3 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -26,6 +26,7 @@ import 
com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -87,7 +88,10 @@ public class RetentionManagerTest {
     
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
@@ -204,7 +208,10 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, 
removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 0526afb..f69657b 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -52,7 +52,7 @@ import static org.testng.Assert.*;
 
 
 /**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
  */
 public class ValidationManagerTest {
   private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -178,7 +178,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(
         
SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, 
segmentName4, 20));
 
-    
assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
 true), 60);
+    
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
 true), 60);
 
     // Now add some low level segment names
     String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 
1000).getSegmentName();
@@ -188,7 +188,7 @@ public class ValidationManagerTest {
     
segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME,
 segmentName6, 5));
 
     // Only the LLC segments should get counted.
-    
assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
 false), 15);
+    
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
 false), 15);
   }
 
   @AfterClass
@@ -210,18 +210,18 @@ public class ValidationManagerTest {
     jan1st2nd3rd.add(jan1st);
     jan1st2nd3rd.add(jan2nd);
     jan1st2nd3rd.add(jan3rd);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, 
Duration.standardDays(1)), 0);
+    
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd,
 Duration.standardDays(1)), 0);
 
     ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
     jan1st2nd3rd5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, 
Duration.standardDays(1)), 1);
+    
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th,
 Duration.standardDays(1)), 1);
 
     // Should also work if the intervals are in random order
     ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
     jan5th2nd1st.add(jan5th);
     jan5th2nd1st.add(jan2nd);
     jan5th2nd1st.add(jan1st);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, 
Duration.standardDays(1)), 2);
+    
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st,
 Duration.standardDays(1)), 2);
 
     // Should also work if the intervals are of different sizes
     Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), 
new DateTime(2015, 1, 2, 23, 59, 59));
@@ -229,6 +229,6 @@ public class ValidationManagerTest {
     jan1st2nd4th5th.add(jan1stAnd2nd);
     jan1st2nd4th5th.add(jan4th);
     jan1st2nd4th5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, 
Duration.standardDays(1)), 1);
+    
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th,
 Duration.standardDays(1)), 1);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 28eb463..5ef1278 100644
--- 
a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ 
b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -28,7 +28,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.common.utils.NetUtil;
 import com.linkedin.pinot.common.utils.ZkStarter;
 import 
com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import 
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
 import 
com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import 
com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -155,7 +155,7 @@ public class SegmentCompletionIntegrationTests extends 
LLCRealtimeClusterIntegra
     final String oldSegment = _currentSegment;
 
     // Now call the validation manager, and the segment should fix itself
-    ValidationManager validationManager = 
_controllerStarter.getValidationManager();
+    RealtimeSegmentValidationManager validationManager = 
_controllerStarter.getRealtimeSegmentValidationManager();
     validationManager.init();
     validationManager.run();
 
diff --git 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index 49c56b1..aa82182 100644
--- 
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++ 
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -150,7 +150,9 @@ public class StartControllerCommand extends 
AbstractBaseAdminCommand implements
         conf.setTenantIsolationEnabled(_tenantIsolation);
 
         conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-        conf.setValidationControllerFrequencyInSeconds(3600);
+        conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+        conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+        conf.setBrokerResourceValidationFrequencyInSeconds(3600);
       }
 
       LOGGER.info("Executing command: " + toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to