This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch periodic_task
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b12a7cf7b6c1fc2715cc0063137dbab79e894257
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Nov 13 18:30:02 2018 -0800

    Enhance controller periodic task and scheduler
    
    For ControllerPeriodicTask:
    1. Random pick an initial delay of range 2-5 mins so each task does not 
start at the same time
    2. Make leadership information private to the class
    For PeriodicTaskScheduler:
    1. Filter out tasks with non-positive interval, so we can easily disable 
any task by setting the interval to an non-positive value
    2. Set the size of thread pool to be the same as number of tasks
    3. Reduce the run-time of PeriodicTaskSchedulerTest
---
 .../pinot/controller/ControllerStarter.java        |  16 +-
 .../controller/helix/SegmentStatusChecker.java     |  17 +-
 .../helix/core/minion/PinotTaskManager.java        |  16 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  64 ++++----
 .../core/relocation/RealtimeSegmentRelocator.java  |  13 +-
 .../helix/core/retention/RetentionManager.java     |  16 +-
 .../controller/validation/ValidationManager.java   |  16 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  68 +-------
 .../periodictask/ControllerPeriodicTaskTest.java   | 128 ++++++++-------
 .../pinot/core/periodictask/BasePeriodicTask.java  |  12 +-
 .../core/periodictask/PeriodicTaskScheduler.java   |  59 ++++---
 .../periodictask/PeriodicTaskSchedulerTest.java    | 181 +++++----------------
 12 files changed, 230 insertions(+), 376 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 69a4a4f..dd5041d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -38,9 +38,9 @@ import 
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrate
 import 
com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
 import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
@@ -166,11 +166,9 @@ public class ControllerStarter {
 
     List<PeriodicTask> periodicTasks = new ArrayList<>();
 
+    LOGGER.info("Adding task manager to periodic task scheduler");
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, 
_helixResourceManager, _config, _controllerMetrics);
-    if (_taskManager.getIntervalInSeconds() > 0) {
-      LOGGER.info("Adding task manager to periodic task scheduler");
-      periodicTasks.add(_taskManager);
-    }
+    periodicTasks.add(_taskManager);
 
     LOGGER.info("Adding retention manager to periodic task scheduler");
     periodicTasks.add(_retentionManager);
@@ -187,12 +185,8 @@ public class ControllerStarter {
     _realtimeSegmentsManager.start(_controllerMetrics);
     PinotLLCRealtimeSegmentManager.getInstance().start();
 
-    if (_segmentStatusChecker.getIntervalInSeconds() == -1L) {
-      LOGGER.warn("Segment status check interval is -1, status checks 
disabled.");
-    } else {
-      LOGGER.info("Adding segment status checker to periodic task scheduler");
-      periodicTasks.add(_segmentStatusChecker);
-    }
+    LOGGER.info("Adding segment status checker to periodic task scheduler");
+    periodicTasks.add(_segmentStatusChecker);
 
     LOGGER.info("Adding realtime segment relocation manager to periodic task 
scheduler");
     periodicTasks.add(_realtimeSegmentRelocator);
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 4e51bd6..549dc9d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -27,8 +27,6 @@ import 
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicT
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
@@ -69,7 +67,6 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
     _config = config;
     _waitForPushTimeSeconds = 
config.getStatusCheckerWaitForPushTimeInSeconds();
     _metricsRegistry = metricsRegistry;
-    HttpConnectionManager httpConnectionManager = new 
MultiThreadedHttpConnectionManager();
   }
 
   @Override
@@ -85,15 +82,17 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    updateSegmentMetrics(allTableNames);
+  public void process(List<String> tables) {
+    updateSegmentMetrics(tables);
   }
 
   /**
-   * Runs a segment status pass over the currently loaded tables.
-   * @param allTableNames List of all the table names
+   * Runs a segment status pass over the given tables.
+   * TODO: revisit the logic and reduce the ZK access
+   *
+   * @param tables List of table names
    */
-  private void updateSegmentMetrics(List<String> allTableNames) {
+  private void updateSegmentMetrics(List<String> tables) {
     // Fetch the list of tables
     String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
     HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
@@ -112,7 +111,7 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask {
       logDisabledTables = false;
     }
 
-    for (String tableName : allTableNames) {
+    for (String tableName : tables) {
       try {
         if (TableNameBuilder.getTableTypeFromTableName(tableName) == 
TableType.OFFLINE) {
           offlineTableCount++;
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 460121e..0ff695d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -52,8 +52,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager 
helixTaskResourceManager,
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull 
ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
-    super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
-        Math.min(60, controllerConf.getTaskManagerFrequencyInSeconds()), 
helixResourceManager);
+    super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, 
helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
@@ -82,12 +81,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask {
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks.
-   * @param allTableNames List of all the table names
+   * Check the Pinot cluster status and schedule new tasks for the given 
tables.
+   *
+   * @param tables List of table names
    * @return Map from task type to task scheduled
    */
   @Nonnull
-  private Map<String, String> scheduleTasks(List<String> allTableNames) {
+  private Map<String, String> scheduleTasks(List<String> tables) {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
     Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
@@ -102,7 +102,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask {
     }
 
     // Scan all table configs to get the tables with tasks enabled
-    for (String tableName : allTableNames) {
+    for (String tableName : tables) {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableName);
       if (tableConfig != null) {
         TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -155,7 +155,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    scheduleTasks(allTableNames);
+  public void process(List<String> tables) {
+    scheduleTasks(tables);
   }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index f00e715..53da198 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,10 +15,10 @@
  */
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
+import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,21 +28,29 @@ import org.slf4j.LoggerFactory;
  * which table resources should be managed by this Pinot controller.
  */
 public abstract class ControllerPeriodicTask extends BasePeriodicTask {
-  public static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
+
+  public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
+  public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
-  private boolean _amILeader;
-  private static final int DEFAULT_INITIAL_DELAY_IN_SECOND = 120;
 
-  public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager) {
-    this(taskName, runFrequencyInSeconds, DEFAULT_INITIAL_DELAY_IN_SECOND, 
pinotHelixResourceManager);
-  }
+  private boolean _wasLeader = false;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, 
long initialDelayInSeconds,
       PinotHelixResourceManager pinotHelixResourceManager) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    setAmILeader(false);
+  }
+
+  public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(), 
pinotHelixResourceManager);
+  }
+
+  private static long getRandomInitialDelayInSeconds() {
+    return MIN_INITIAL_DELAY_IN_SECONDS + new Random().nextInt(
+        MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
   }
 
   @Override
@@ -60,38 +68,29 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
   }
 
   private void skipLeaderTask() {
-    if (getAmILeader()) {
+    if (_wasLeader) {
       LOGGER.info("Current pinot controller lost leadership.");
       onBecomeNotLeader();
+      _wasLeader = false;
     }
-    setAmILeader(false);
-    LOGGER.info("Skip running periodic task: {} on non-leader controller", 
getTaskName());
+    LOGGER.info("Skip running periodic task: {} on non-leader controller", 
_taskName);
   }
 
-  private void processLeaderTask(List<String> allTableNames) {
-    if (!getAmILeader()) {
+  private void processLeaderTask(List<String> tables) {
+    if (!_wasLeader) {
       LOGGER.info("Current pinot controller became leader. Starting {} with 
running frequency of {} seconds.",
-          getTaskName(), getIntervalInSeconds());
+          _taskName, _intervalInSeconds);
       onBecomeLeader();
+      _wasLeader = true;
     }
-    setAmILeader(true);
     long startTime = System.currentTimeMillis();
-    LOGGER.info("Starting to process {} tables in periodic task: {}", 
allTableNames.size(), getTaskName());
-    process(allTableNames);
-    LOGGER.info("Finished processing {} tables in periodic task: {} in {}ms", 
allTableNames.size(), getTaskName(),
+    int numTables = tables.size();
+    LOGGER.info("Start processing {} tables in periodic task: {}", numTables, 
_taskName);
+    process(tables);
+    LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", 
numTables, _taskName,
         (System.currentTimeMillis() - startTime));
   }
 
-  @VisibleForTesting
-  public boolean getAmILeader() {
-    return _amILeader;
-  }
-
-  @VisibleForTesting
-  public void setAmILeader(boolean amILeader) {
-    _amILeader = amILeader;
-  }
-
   /**
    * Does the following logic when losing the leadership. This should be done 
only once during leadership transition.
    */
@@ -105,8 +104,9 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
   }
 
   /**
-   * Processes the periodic task as lead controller.
-   * @param allTableNames List of all the table names
+   * Processes the task on the given tables.
+   *
+   * @param tables List of table names
    */
-  public abstract void process(List<String> allTableNames);
+  public abstract void process(List<String> tables);
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 0954f5b..ebd5f37 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,18 +58,19 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    runRelocation(allTableNames);
+  public void process(List<String> tables) {
+    runRelocation(tables);
   }
 
   /**
-   * Check all tables. Perform relocation of segments if table is realtime and 
relocation is required
+   * Check the given tables. Perform relocation of segments if table is 
realtime and relocation is required
    * TODO: Model this to implement {@link 
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} 
interface
    * https://github.com/linkedin/pinot/issues/2609
-   * @param allTableNames List of all the table names
+   *
+   * @param tables List of table names
    */
-  private void runRelocation(List<String> allTableNames) {
-    for (final String tableNameWithType : allTableNames) {
+  private void runRelocation(List<String> tables) {
+    for (final String tableNameWithType : tables) {
       // Only consider realtime tables.
       if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
         continue;
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 7423cef..d76cc3b 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -51,8 +51,7 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, 
int runFrequencyInSeconds,
       int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, Math.min(60, 
runFrequencyInSeconds),
-        pinotHelixResourceManager);
+    super("RetentionManager", runFrequencyInSeconds, 
pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
   }
 
@@ -63,17 +62,18 @@ public class RetentionManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    execute(allTableNames);
+  public void process(List<String> tables) {
+    execute(tables);
   }
 
   /**
-   * Manages retention for all tables.
-   * @param allTableNames List of all the table names
+   * Manages retention for the given tables.
+   *
+   * @param tables List of table names
    */
-  private void execute(List<String> allTableNames) {
+  private void execute(List<String> tables) {
     try {
-      for (String tableNameWithType : allTableNames) {
+      for (String tableNameWithType : tables) {
         LOGGER.info("Start managing retention for table: {}", 
tableNameWithType);
         manageRetentionForTable(tableNameWithType);
       }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index ce2b404..8717318 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -60,8 +60,7 @@ public class ValidationManager extends ControllerPeriodicTask 
{
 
   public ValidationManager(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics) {
-    super("ValidationManager", 
config.getValidationControllerFrequencyInSeconds(),
-        config.getValidationControllerFrequencyInSeconds() / 2, 
pinotHelixResourceManager);
+    super("ValidationManager", 
config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
     _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
@@ -75,15 +74,16 @@ public class ValidationManager extends 
ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> allTableNames) {
-    runValidation(allTableNames);
+  public void process(List<String> tables) {
+    runValidation(tables);
   }
 
   /**
-   * Runs a validation pass over the currently loaded tables.
-   * @param allTableNames List of all the table names
+   * Runs a validation pass over the given tables.
+   *
+   * @param tables List of table names
    */
-  private void runValidation(List<String> allTableNames) {
+  private void runValidation(List<String> tables) {
     // Run segment level validation using a separate interval
     boolean runSegmentLevelValidation = false;
     long currentTimeMs = System.currentTimeMillis();
@@ -97,7 +97,7 @@ public class ValidationManager extends ControllerPeriodicTask 
{
     // Cache instance configs to reduce ZK access
     List<InstanceConfig> instanceConfigs = 
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
 
-    for (String tableNameWithType : allTableNames) {
+    for (String tableNameWithType : tables) {
       try {
         TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
         if (tableConfig == null) {
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index 16248db..de6eca9 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -33,13 +33,9 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class SegmentStatusCheckerTest {
@@ -49,18 +45,6 @@ public class SegmentStatusCheckerTest {
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
 
-  @BeforeSuite
-  public void setUp() throws Exception {
-  }
-
-  @AfterSuite
-  public void tearDown() {
-  }
-
-  @BeforeMethod
-  public void beforeMethod() {
-  }
-
   @Test
   public void offlineBasicTest() throws Exception {
     final String tableName = "myTable_OFFLINE";
@@ -104,7 +88,6 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -115,7 +98,6 @@ public class SegmentStatusCheckerTest {
         ControllerGauge.PERCENT_OF_REPLICAS), 33);
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
   }
 
   @Test
@@ -172,7 +154,6 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -183,53 +164,6 @@ public class SegmentStatusCheckerTest {
         ControllerGauge.PERCENT_OF_REPLICAS), 100);
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), true);
-  }
-
-  @Test
-  public void nonLeaderTest() throws Exception {
-    final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-
-    HelixAdmin helixAdmin;
-    {
-      helixAdmin = mock(HelixAdmin.class);
-    }
-    {
-      helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(false);
-      when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
-      
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
-      when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
-    }
-    {
-      config = mock(ControllerConf.class);
-      when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
-      when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
-    }
-    metricsRegistry = new MetricsRegistry();
-    controllerMetrics = new ControllerMetrics(metricsRegistry);
-
-    // From non-leader to non-leader.
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
-    segmentStatusChecker.init();
-    segmentStatusChecker.run();
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        Long.MIN_VALUE);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
-
-    // Leadership transition from leader to non-leader.
-    controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0L);
-    segmentStatusChecker.setAmILeader(true);
-    segmentStatusChecker.run();
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        Long.MIN_VALUE);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(segmentStatusChecker.getAmILeader(), false);
   }
 
   @Test
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 354456f..6a2c1b1 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -16,75 +16,91 @@
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
-import com.linkedin.pinot.core.periodictask.PeriodicTask;
-import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.Assert;
-import org.testng.annotations.BeforeTest;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
 
 
 public class ControllerPeriodicTaskTest {
-  private PinotHelixResourceManager helixResourceManager;
-  private AtomicInteger numOfProcessingMessages;
-
-  @BeforeTest
-  public void setUp() {
-    numOfProcessingMessages = new AtomicInteger(0);
-    helixResourceManager = mock(PinotHelixResourceManager.class);
-    List<String> allTableNames = new ArrayList<>();
-    allTableNames.add("testTable_REALTIME");
-    allTableNames.add("testTable_OFFLINE");
-    when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
+  private static final long RUN_FREQUENCY_IN_SECONDS = 30;
+
+  private final PinotHelixResourceManager _resourceManager = 
mock(PinotHelixResourceManager.class);
+  private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
+  private final AtomicBoolean _processCalled = new AtomicBoolean();
+
+  private final ControllerPeriodicTask _task =
+      new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, 
_resourceManager) {
+        @Override
+        public void onBecomeLeader() {
+          _onBecomeLeaderCalled.set(true);
+        }
+
+        @Override
+        public void onBecomeNotLeader() {
+          _onBecomeNonLeaderCalled.set(true);
+        }
+
+        @Override
+        public void process(List<String> tables) {
+          _processCalled.set(true);
+        }
+      };
+
+  private void resetState() {
+    _onBecomeLeaderCalled.set(false);
+    _onBecomeNonLeaderCalled.set(false);
+    _processCalled.set(false);
   }
 
   @Test
-  public void testWhenControllerIsLeader() throws InterruptedException {
-    long totalRunTimeInMilliseconds = 3_500L;
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-    when(helixResourceManager.isLeader()).thenReturn(true);
-
-    PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, 
initialDelayInSeconds);
-
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    periodicTaskScheduler.start(Collections.singletonList(periodicTask));
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
-    Assert.assertEquals(totalRunTimeInMilliseconds / 1000L, 
numOfProcessingMessages.get());
+  public void testRandomInitialDelay() {
+    assertTrue(_task.getInitialDelayInSeconds() >= 
ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS);
+    assertTrue(_task.getInitialDelayInSeconds() < 
ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS);
+
+    assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
   }
 
   @Test
-  public void testWhenControllerIsNotLeader() throws InterruptedException {
-    long totalRunTimeInMilliseconds = 3_500L;
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-
-    when(helixResourceManager.isLeader()).thenReturn(false);
-    PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, 
initialDelayInSeconds);
-
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    periodicTaskScheduler.start(Collections.singletonList(periodicTask));
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
-    Assert.assertEquals(0, numOfProcessingMessages.get());
-  }
+  public void testChangeLeadership() {
+    // Initial state
+    resetState();
+    _task.init();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
+
+    // From non-leader to non-leader
+    resetState();
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
+
+    // From non-leader to leader
+    resetState();
+    when(_resourceManager.isLeader()).thenReturn(true);
+    _task.run();
+    assertTrue(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertTrue(_processCalled.get());
+
+    // From leader to leader
+    resetState();
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertFalse(_onBecomeNonLeaderCalled.get());
+    assertTrue(_processCalled.get());
 
-  private PeriodicTask createMockPeriodicTask(long runFrequencyInSeconds, long 
initialDelayInSeconds) {
-    return new ControllerPeriodicTask("Task", runFrequencyInSeconds, 
initialDelayInSeconds, helixResourceManager) {
-      public void init() {
-        numOfProcessingMessages.set(0);
-      }
-
-      @Override
-      public void process(List<String> allTableNames) {
-        numOfProcessingMessages.incrementAndGet();
-      }
-    };
+    // From leader to non-leader
+    resetState();
+    when(_resourceManager.isLeader()).thenReturn(false);
+    _task.run();
+    assertFalse(_onBecomeLeaderCalled.get());
+    assertTrue(_onBecomeNonLeaderCalled.get());
+    assertFalse(_processCalled.get());
   }
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
index a1f4207..15ee475 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java
@@ -19,9 +19,9 @@ package com.linkedin.pinot.core.periodictask;
  * A base class to implement periodic task interface.
  */
 public abstract class BasePeriodicTask implements PeriodicTask {
-  private final String _taskName;
-  private long _intervalInSeconds;
-  private long _initialDelayInSeconds;
+  protected final String _taskName;
+  protected final long _intervalInSeconds;
+  protected final long _initialDelayInSeconds;
 
   public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long 
initialDelayInSeconds) {
     _taskName = taskName;
@@ -43,4 +43,10 @@ public abstract class BasePeriodicTask implements 
PeriodicTask {
   public String getTaskName() {
     return _taskName;
   }
+
+  @Override
+  public String toString() {
+    return String.format("Task: %s, Interval: %ds, Initial Delay: %ds", 
_taskName, _intervalInSeconds,
+        _initialDelayInSeconds);
+  }
 }
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
index f58fffc..e5565de 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.core.periodictask;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -22,51 +23,55 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Periodic task scheduler will schedule a list of tasks based on their 
initial delay time and interval time.
  */
 public class PeriodicTaskScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicTaskScheduler.class);
-  private static final int CORE_POOL_SIZE = 5;
-  private final ScheduledExecutorService _executorService;
 
-  public PeriodicTaskScheduler() {
-    LOGGER.info("Initializing PeriodicTaskScheduler.");
-    _executorService = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
-  }
+  private ScheduledExecutorService _executorService;
 
   /**
    * Start scheduling periodic tasks.
    */
   public void start(List<PeriodicTask> periodicTasks) {
-    if (periodicTasks == null || periodicTasks.isEmpty()) {
-      LOGGER.warn("No periodic task assigned to scheduler!");
-      return;
+    if (_executorService != null) {
+      LOGGER.warn("Periodic task scheduler already started");
     }
 
-    if (periodicTasks.size() > CORE_POOL_SIZE) {
-      LOGGER.warn("The number of tasks:{} is more than the default number of 
threads:{}.", periodicTasks.size(),
-          CORE_POOL_SIZE);
+    List<PeriodicTask> tasksWithValidInterval = new ArrayList<>();
+    for (PeriodicTask periodicTask : periodicTasks) {
+      if (periodicTask.getIntervalInSeconds() > 0) {
+        tasksWithValidInterval.add(periodicTask);
+      }
     }
 
-    LOGGER.info("Starting PeriodicTaskScheduler.");
-    // Set up an executor that executes tasks periodically
-    for (PeriodicTask periodicTask : periodicTasks) {
-      periodicTask.init();
-      _executorService.scheduleWithFixedDelay(() -> {
-        try {
-          periodicTask.run();
-        } catch (Throwable e) {
-          // catch all errors to prevent subsequent executions from being 
silently suppressed
-          // Ref: 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
-          LOGGER.warn("Caught exception while running Task: {}", 
periodicTask.getTaskName(), e);
-        }
-      }, periodicTask.getInitialDelayInSeconds(), 
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+    if (tasksWithValidInterval.isEmpty()) {
+      LOGGER.warn("No periodic task scheduled");
+    } else {
+      LOGGER.info("Starting periodic task scheduler with tasks: {}", 
tasksWithValidInterval);
+      _executorService = 
Executors.newScheduledThreadPool(tasksWithValidInterval.size());
+      for (PeriodicTask periodicTask : periodicTasks) {
+        periodicTask.init();
+        _executorService.scheduleWithFixedDelay(() -> {
+          try {
+            periodicTask.run();
+          } catch (Throwable e) {
+            // catch all errors to prevent subsequent executions from being 
silently suppressed
+            // Ref: 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
+            LOGGER.warn("Caught exception while running Task: {}", 
periodicTask.getTaskName(), e);
+          }
+        }, periodicTask.getInitialDelayInSeconds(), 
periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS);
+      }
     }
   }
 
   public void stop() {
-    LOGGER.info("Stopping PeriodicTaskScheduler");
-    _executorService.shutdown();
+    if (_executorService != null) {
+      LOGGER.info("Stopping periodic task scheduler");
+      _executorService.shutdown();
+      _executorService = null;
+    }
   }
 }
diff --git 
a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
 
b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index a2ff21e..78aad1c 100644
--- 
a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++ 
b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -16,171 +16,70 @@
 package com.linkedin.pinot.core.periodictask;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
 
-public class PeriodicTaskSchedulerTest {
-
-  @Test
-  public void testSchedulerWithOneTask() throws InterruptedException {
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 1L;
-    long initialDelayInSeconds = 1L;
-    long totalRunTimeInMilliseconds = 3_500L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task = new BasePeriodicTask("Task", runFrequencyInSeconds, 
initialDelayInSeconds) {
-      @Override
-      public void init() {
-        count.set(0);
-      }
-
-      @Override
-      public void run() {
-        // Execute task.
-        count.incrementAndGet();
-      }
-    };
-    periodicTasks.add(task);
-
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
 
-    periodicTaskScheduler.stop();
-
-    Assert.assertTrue(count.get() > 0);
-    Assert.assertEquals(count.get(), (totalRunTimeInMilliseconds / 
(runFrequencyInSeconds * 1000)));
-    Assert.assertTrue(totalRunTimeInMilliseconds <= 
(System.currentTimeMillis() - start));
-  }
+public class PeriodicTaskSchedulerTest {
 
   @Test
-  public void testSchedulerWithTwoStaggeredTasks() throws InterruptedException 
{
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 2L;
-    long totalRunTimeInMilliseconds = 1_500L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 
0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Execute task.
-        count.incrementAndGet();
-      }
-    };
-    periodicTasks.add(task1);
+  public void testTaskWithInvalidInterval() throws Exception {
+    AtomicBoolean initCalled = new AtomicBoolean();
+    AtomicBoolean runCalled = new AtomicBoolean();
 
-    // Stagger 2 tasks by delaying the 2nd task half of the frequency.
-    PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds, 
runFrequencyInSeconds / 2) {
+    List<PeriodicTask> periodicTasks = Collections.singletonList(new 
BasePeriodicTask("TestTask", 0L, 0L) {
       @Override
       public void init() {
+        initCalled.set(true);
       }
 
       @Override
       public void run() {
-        // Execute task.
-        count.decrementAndGet();
+        runCalled.set(true);
       }
-    };
-    periodicTasks.add(task2);
+    });
 
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
-    periodicTaskScheduler.stop();
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.start(periodicTasks);
+    Thread.sleep(100L);
+    taskScheduler.stop();
 
-    Assert.assertEquals(count.get(), 0);
-    Assert.assertTrue(totalRunTimeInMilliseconds <= 
(System.currentTimeMillis() - start));
+    assertFalse(initCalled.get());
+    assertFalse(runCalled.get());
   }
 
   @Test
-  public void testSchedulerWithTwoTasksDifferentFrequencies() throws 
InterruptedException {
-    long startTime = System.currentTimeMillis();
-    AtomicLong count = new AtomicLong(startTime);
-    AtomicLong count2 = new AtomicLong(startTime);
-    final long[] maxRunTimeForTask1 = {0L};
-    final long[] maxRunTimeForTask2 = {0L};
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long runFrequencyInSeconds = 1L;
-    long totalRunTimeInMilliseconds = 10_000L;
-
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 
0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Calculate the max waiting time between the same task.
-        long lastTime = count.get();
-        long now = System.currentTimeMillis();
-        maxRunTimeForTask1[0] = Math.max(maxRunTimeForTask1[0], (now - 
lastTime));
-        count.set(now);
-      }
-    };
-    periodicTasks.add(task1);
-
-    // The time for Task 2 to run is 4 seconds, which is higher than the 
interval time of Task 1.
-    long TimeToRunMs = 4_000L;
-    PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds * 
3, 0L) {
-      @Override
-      public void init() {
-      }
-
-      @Override
-      public void run() {
-        // Calculate the max waiting time between the same task.
-        long lastTime = count2.get();
-        long now = System.currentTimeMillis();
-        maxRunTimeForTask2[0] = Math.max(maxRunTimeForTask2[0], (now - 
lastTime));
-        count2.set(now);
-        try {
-          Thread.sleep(TimeToRunMs);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
+  public void testScheduleMultipleTasks() throws Exception {
+    int numTasks = 3;
+    AtomicInteger numTimesInitCalled = new AtomicInteger();
+    AtomicInteger numTimesRunCalled = new AtomicInteger();
+
+    List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) {
+        @Override
+        public void init() {
+          numTimesInitCalled.getAndIncrement();
         }
-      }
-    };
-    periodicTasks.add(task2);
-
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
-
-    periodicTaskScheduler.stop();
 
-    Assert.assertTrue(count.get() > startTime);
-    Assert.assertTrue(count2.get() > startTime);
-    // Task1 didn't waited until Task2 finished.
-    Assert.assertTrue(maxRunTimeForTask1[0] - task1.getIntervalInSeconds() * 
1000L < 100L);
-    Assert.assertTrue(maxRunTimeForTask2[0] >= 
Math.max(task2.getIntervalInSeconds() * 1000L, TimeToRunMs));
-  }
-
-  @Test
-  public void testNoTaskAssignedToQueue() throws InterruptedException {
-    AtomicInteger count = new AtomicInteger(0);
-    PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler();
-    long totalRunTimeInMilliseconds = 2_000L;
-
-    // An empty list.
-    List<PeriodicTask> periodicTasks = new ArrayList<>();
-    long start = System.currentTimeMillis();
-    periodicTaskScheduler.start(periodicTasks);
-    Thread.sleep(totalRunTimeInMilliseconds);
+        @Override
+        public void run() {
+          numTimesRunCalled.getAndIncrement();
+        }
+      });
+    }
 
-    periodicTaskScheduler.stop();
+    PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler();
+    taskScheduler.start(periodicTasks);
+    Thread.sleep(1100L);
+    taskScheduler.stop();
 
-    Assert.assertEquals(count.get(), 0);
-    Assert.assertTrue(totalRunTimeInMilliseconds <= 
(System.currentTimeMillis() - start));
+    assertEquals(numTimesInitCalled.get(), numTasks);
+    assertEquals(numTimesRunCalled.get(), numTasks * 2);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to