This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch periodic_task in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b12a7cf7b6c1fc2715cc0063137dbab79e894257 Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Tue Nov 13 18:30:02 2018 -0800 Enhance controller periodic task and scheduler For ControllerPeriodicTask: 1. Random pick an initial delay of range 2-5 mins so each task does not start at the same time 2. Make leadership information private to the class For PeriodicTaskScheduler: 1. Filter out tasks with non-positive interval, so we can easily disable any task by setting the interval to an non-positive value 2. Set the size of thread pool to be the same as number of tasks 3. Reduce the run-time of PeriodicTaskSchedulerTest --- .../pinot/controller/ControllerStarter.java | 16 +- .../controller/helix/SegmentStatusChecker.java | 17 +- .../helix/core/minion/PinotTaskManager.java | 16 +- .../core/periodictask/ControllerPeriodicTask.java | 64 ++++---- .../core/relocation/RealtimeSegmentRelocator.java | 13 +- .../helix/core/retention/RetentionManager.java | 16 +- .../controller/validation/ValidationManager.java | 16 +- .../controller/helix/SegmentStatusCheckerTest.java | 68 +------- .../periodictask/ControllerPeriodicTaskTest.java | 128 ++++++++------- .../pinot/core/periodictask/BasePeriodicTask.java | 12 +- .../core/periodictask/PeriodicTaskScheduler.java | 59 ++++--- .../periodictask/PeriodicTaskSchedulerTest.java | 181 +++++---------------- 12 files changed, 230 insertions(+), 376 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java index 69a4a4f..dd5041d 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java @@ -38,9 +38,9 @@ import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrate import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator; import com.linkedin.pinot.controller.helix.core.retention.RetentionManager; import com.linkedin.pinot.controller.validation.ValidationManager; +import com.linkedin.pinot.core.crypt.PinotCrypterFactory; import com.linkedin.pinot.core.periodictask.PeriodicTask; import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler; -import com.linkedin.pinot.core.crypt.PinotCrypterFactory; import com.linkedin.pinot.filesystem.PinotFSFactory; import com.yammer.metrics.core.MetricsRegistry; import java.io.File; @@ -166,11 +166,9 @@ public class ControllerStarter { List<PeriodicTask> periodicTasks = new ArrayList<>(); + LOGGER.info("Adding task manager to periodic task scheduler"); _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics); - if (_taskManager.getIntervalInSeconds() > 0) { - LOGGER.info("Adding task manager to periodic task scheduler"); - periodicTasks.add(_taskManager); - } + periodicTasks.add(_taskManager); LOGGER.info("Adding retention manager to periodic task scheduler"); periodicTasks.add(_retentionManager); @@ -187,12 +185,8 @@ public class ControllerStarter { _realtimeSegmentsManager.start(_controllerMetrics); PinotLLCRealtimeSegmentManager.getInstance().start(); - if (_segmentStatusChecker.getIntervalInSeconds() == -1L) { - LOGGER.warn("Segment status check interval is -1, status checks disabled."); - } else { - LOGGER.info("Adding segment status checker to periodic task scheduler"); - periodicTasks.add(_segmentStatusChecker); - } + LOGGER.info("Adding segment status checker to periodic task scheduler"); + periodicTasks.add(_segmentStatusChecker); LOGGER.info("Adding realtime segment relocation manager to periodic task scheduler"); periodicTasks.add(_realtimeSegmentRelocator); diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java index 4e51bd6..549dc9d 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java @@ -27,8 +27,6 @@ import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicT import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.httpclient.HttpConnectionManager; -import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.helix.HelixAdmin; import org.apache.helix.ZNRecord; import org.apache.helix.model.ExternalView; @@ -69,7 +67,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask { _config = config; _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds(); _metricsRegistry = metricsRegistry; - HttpConnectionManager httpConnectionManager = new MultiThreadedHttpConnectionManager(); } @Override @@ -85,15 +82,17 @@ public class SegmentStatusChecker extends ControllerPeriodicTask { } @Override - public void process(List<String> allTableNames) { - updateSegmentMetrics(allTableNames); + public void process(List<String> tables) { + updateSegmentMetrics(tables); } /** - * Runs a segment status pass over the currently loaded tables. - * @param allTableNames List of all the table names + * Runs a segment status pass over the given tables. + * TODO: revisit the logic and reduce the ZK access + * + * @param tables List of table names */ - private void updateSegmentMetrics(List<String> allTableNames) { + private void updateSegmentMetrics(List<String> tables) { // Fetch the list of tables String helixClusterName = _pinotHelixResourceManager.getHelixClusterName(); HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); @@ -112,7 +111,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask { logDisabledTables = false; } - for (String tableName : allTableNames) { + for (String tableName : tables) { try { if (TableNameBuilder.getTableTypeFromTableName(tableName) == TableType.OFFLINE) { offlineTableCount++; diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java index 460121e..0ff695d 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -52,8 +52,7 @@ public class PinotTaskManager extends ControllerPeriodicTask { public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager, @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf, @Nonnull ControllerMetrics controllerMetrics) { - super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), - Math.min(60, controllerConf.getTaskManagerFrequencyInSeconds()), helixResourceManager); + super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager); _helixTaskResourceManager = helixTaskResourceManager; _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf); _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider); @@ -82,12 +81,13 @@ public class PinotTaskManager extends ControllerPeriodicTask { } /** - * Check the Pinot cluster status and schedule new tasks. - * @param allTableNames List of all the table names + * Check the Pinot cluster status and schedule new tasks for the given tables. + * + * @param tables List of table names * @return Map from task type to task scheduled */ @Nonnull - private Map<String, String> scheduleTasks(List<String> allTableNames) { + private Map<String, String> scheduleTasks(List<String> tables) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes(); @@ -102,7 +102,7 @@ public class PinotTaskManager extends ControllerPeriodicTask { } // Scan all table configs to get the tables with tasks enabled - for (String tableName : allTableNames) { + for (String tableName : tables) { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableName); if (tableConfig != null) { TableTaskConfig taskConfig = tableConfig.getTaskConfig(); @@ -155,7 +155,7 @@ public class PinotTaskManager extends ControllerPeriodicTask { } @Override - public void process(List<String> allTableNames) { - scheduleTasks(allTableNames); + public void process(List<String> tables) { + scheduleTasks(tables); } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index f00e715..53da198 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -15,10 +15,10 @@ */ package com.linkedin.pinot.controller.helix.core.periodictask; -import com.google.common.annotations.VisibleForTesting; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.core.periodictask.BasePeriodicTask; import java.util.List; +import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,21 +28,29 @@ import org.slf4j.LoggerFactory; * which table resources should be managed by this Pinot controller. */ public abstract class ControllerPeriodicTask extends BasePeriodicTask { - public static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class); + + public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120; + public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300; + protected final PinotHelixResourceManager _pinotHelixResourceManager; - private boolean _amILeader; - private static final int DEFAULT_INITIAL_DELAY_IN_SECOND = 120; - public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, - PinotHelixResourceManager pinotHelixResourceManager) { - this(taskName, runFrequencyInSeconds, DEFAULT_INITIAL_DELAY_IN_SECOND, pinotHelixResourceManager); - } + private boolean _wasLeader = false; public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds, PinotHelixResourceManager pinotHelixResourceManager) { super(taskName, runFrequencyInSeconds, initialDelayInSeconds); _pinotHelixResourceManager = pinotHelixResourceManager; - setAmILeader(false); + } + + public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, + PinotHelixResourceManager pinotHelixResourceManager) { + this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(), pinotHelixResourceManager); + } + + private static long getRandomInitialDelayInSeconds() { + return MIN_INITIAL_DELAY_IN_SECONDS + new Random().nextInt( + MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS); } @Override @@ -60,38 +68,29 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { } private void skipLeaderTask() { - if (getAmILeader()) { + if (_wasLeader) { LOGGER.info("Current pinot controller lost leadership."); onBecomeNotLeader(); + _wasLeader = false; } - setAmILeader(false); - LOGGER.info("Skip running periodic task: {} on non-leader controller", getTaskName()); + LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName); } - private void processLeaderTask(List<String> allTableNames) { - if (!getAmILeader()) { + private void processLeaderTask(List<String> tables) { + if (!_wasLeader) { LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.", - getTaskName(), getIntervalInSeconds()); + _taskName, _intervalInSeconds); onBecomeLeader(); + _wasLeader = true; } - setAmILeader(true); long startTime = System.currentTimeMillis(); - LOGGER.info("Starting to process {} tables in periodic task: {}", allTableNames.size(), getTaskName()); - process(allTableNames); - LOGGER.info("Finished processing {} tables in periodic task: {} in {}ms", allTableNames.size(), getTaskName(), + int numTables = tables.size(); + LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName); + process(tables); + LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName, (System.currentTimeMillis() - startTime)); } - @VisibleForTesting - public boolean getAmILeader() { - return _amILeader; - } - - @VisibleForTesting - public void setAmILeader(boolean amILeader) { - _amILeader = amILeader; - } - /** * Does the following logic when losing the leadership. This should be done only once during leadership transition. */ @@ -105,8 +104,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { } /** - * Processes the periodic task as lead controller. - * @param allTableNames List of all the table names + * Processes the task on the given tables. + * + * @param tables List of table names */ - public abstract void process(List<String> allTableNames); + public abstract void process(List<String> tables); } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java index 0954f5b..ebd5f37 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java @@ -58,18 +58,19 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { } @Override - public void process(List<String> allTableNames) { - runRelocation(allTableNames); + public void process(List<String> tables) { + runRelocation(tables); } /** - * Check all tables. Perform relocation of segments if table is realtime and relocation is required + * Check the given tables. Perform relocation of segments if table is realtime and relocation is required * TODO: Model this to implement {@link com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} interface * https://github.com/linkedin/pinot/issues/2609 - * @param allTableNames List of all the table names + * + * @param tables List of table names */ - private void runRelocation(List<String> allTableNames) { - for (final String tableNameWithType : allTableNames) { + private void runRelocation(List<String> tables) { + for (final String tableNameWithType : tables) { // Only consider realtime tables. if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) { continue; diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java index 7423cef..d76cc3b 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java @@ -51,8 +51,7 @@ public class RetentionManager extends ControllerPeriodicTask { public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds, int deletedSegmentsRetentionInDays) { - super("RetentionManager", runFrequencyInSeconds, Math.min(60, runFrequencyInSeconds), - pinotHelixResourceManager); + super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager); _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; } @@ -63,17 +62,18 @@ public class RetentionManager extends ControllerPeriodicTask { } @Override - public void process(List<String> allTableNames) { - execute(allTableNames); + public void process(List<String> tables) { + execute(tables); } /** - * Manages retention for all tables. - * @param allTableNames List of all the table names + * Manages retention for the given tables. + * + * @param tables List of table names */ - private void execute(List<String> allTableNames) { + private void execute(List<String> tables) { try { - for (String tableNameWithType : allTableNames) { + for (String tableNameWithType : tables) { LOGGER.info("Start managing retention for table: {}", tableNameWithType); manageRetentionForTable(tableNameWithType); } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java index ce2b404..8717318 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java @@ -60,8 +60,7 @@ public class ValidationManager extends ControllerPeriodicTask { public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) { - super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), - config.getValidationControllerFrequencyInSeconds() / 2, pinotHelixResourceManager); + super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager); _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; @@ -75,15 +74,16 @@ public class ValidationManager extends ControllerPeriodicTask { } @Override - public void process(List<String> allTableNames) { - runValidation(allTableNames); + public void process(List<String> tables) { + runValidation(tables); } /** - * Runs a validation pass over the currently loaded tables. - * @param allTableNames List of all the table names + * Runs a validation pass over the given tables. + * + * @param tables List of table names */ - private void runValidation(List<String> allTableNames) { + private void runValidation(List<String> tables) { // Run segment level validation using a separate interval boolean runSegmentLevelValidation = false; long currentTimeMs = System.currentTimeMillis(); @@ -97,7 +97,7 @@ public class ValidationManager extends ControllerPeriodicTask { // Cache instance configs to reduce ZK access List<InstanceConfig> instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs(); - for (String tableNameWithType : allTableNames) { + for (String tableNameWithType : tables) { try { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); if (tableConfig == null) { diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java index 16248db..de6eca9 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -33,13 +33,9 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.testng.Assert; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeSuite; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class SegmentStatusCheckerTest { @@ -49,18 +45,6 @@ public class SegmentStatusCheckerTest { private ControllerMetrics controllerMetrics; private ControllerConf config; - @BeforeSuite - public void setUp() throws Exception { - } - - @AfterSuite - public void tearDown() { - } - - @BeforeMethod - public void beforeMethod() { - } - @Test public void offlineBasicTest() throws Exception { final String tableName = "myTable_OFFLINE"; @@ -104,7 +88,6 @@ public class SegmentStatusCheckerTest { metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), false); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -115,7 +98,6 @@ public class SegmentStatusCheckerTest { ControllerGauge.PERCENT_OF_REPLICAS), 33); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), true); } @Test @@ -172,7 +154,6 @@ public class SegmentStatusCheckerTest { metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), false); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -183,53 +164,6 @@ public class SegmentStatusCheckerTest { ControllerGauge.PERCENT_OF_REPLICAS), 100); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), true); - } - - @Test - public void nonLeaderTest() throws Exception { - final String tableName = "myTable_REALTIME"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); - - HelixAdmin helixAdmin; - { - helixAdmin = mock(HelixAdmin.class); - } - { - helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(false); - when(helixResourceManager.getAllTables()).thenReturn(allTableNames); - when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); - when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); - } - { - config = mock(ControllerConf.class); - when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - metricsRegistry = new MetricsRegistry(); - controllerMetrics = new ControllerMetrics(metricsRegistry); - - // From non-leader to non-leader. - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); - segmentStatusChecker.init(); - segmentStatusChecker.run(); - Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), - Long.MIN_VALUE); - Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), - Long.MIN_VALUE); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), false); - - // Leadership transition from leader to non-leader. - controllerMetrics.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0L); - segmentStatusChecker.setAmILeader(true); - segmentStatusChecker.run(); - Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), - Long.MIN_VALUE); - Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), - Long.MIN_VALUE); - Assert.assertEquals(segmentStatusChecker.getAmILeader(), false); } @Test diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 354456f..6a2c1b1 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -16,75 +16,91 @@ package com.linkedin.pinot.controller.helix.core.periodictask; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; -import com.linkedin.pinot.core.periodictask.PeriodicTask; -import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.Assert; -import org.testng.annotations.BeforeTest; +import java.util.concurrent.atomic.AtomicBoolean; import org.testng.annotations.Test; import static org.mockito.Mockito.*; +import static org.testng.Assert.*; public class ControllerPeriodicTaskTest { - private PinotHelixResourceManager helixResourceManager; - private AtomicInteger numOfProcessingMessages; - - @BeforeTest - public void setUp() { - numOfProcessingMessages = new AtomicInteger(0); - helixResourceManager = mock(PinotHelixResourceManager.class); - List<String> allTableNames = new ArrayList<>(); - allTableNames.add("testTable_REALTIME"); - allTableNames.add("testTable_OFFLINE"); - when(helixResourceManager.getAllTables()).thenReturn(allTableNames); + private static final long RUN_FREQUENCY_IN_SECONDS = 30; + + private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class); + private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean(); + private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean(); + private final AtomicBoolean _processCalled = new AtomicBoolean(); + + private final ControllerPeriodicTask _task = + new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) { + @Override + public void onBecomeLeader() { + _onBecomeLeaderCalled.set(true); + } + + @Override + public void onBecomeNotLeader() { + _onBecomeNonLeaderCalled.set(true); + } + + @Override + public void process(List<String> tables) { + _processCalled.set(true); + } + }; + + private void resetState() { + _onBecomeLeaderCalled.set(false); + _onBecomeNonLeaderCalled.set(false); + _processCalled.set(false); } @Test - public void testWhenControllerIsLeader() throws InterruptedException { - long totalRunTimeInMilliseconds = 3_500L; - long runFrequencyInSeconds = 1L; - long initialDelayInSeconds = 1L; - when(helixResourceManager.isLeader()).thenReturn(true); - - PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, initialDelayInSeconds); - - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - periodicTaskScheduler.start(Collections.singletonList(periodicTask)); - Thread.sleep(totalRunTimeInMilliseconds); - periodicTaskScheduler.stop(); - Assert.assertEquals(totalRunTimeInMilliseconds / 1000L, numOfProcessingMessages.get()); + public void testRandomInitialDelay() { + assertTrue(_task.getInitialDelayInSeconds() >= ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS); + assertTrue(_task.getInitialDelayInSeconds() < ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS); + + assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS); } @Test - public void testWhenControllerIsNotLeader() throws InterruptedException { - long totalRunTimeInMilliseconds = 3_500L; - long runFrequencyInSeconds = 1L; - long initialDelayInSeconds = 1L; - - when(helixResourceManager.isLeader()).thenReturn(false); - PeriodicTask periodicTask = createMockPeriodicTask(runFrequencyInSeconds, initialDelayInSeconds); - - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - periodicTaskScheduler.start(Collections.singletonList(periodicTask)); - Thread.sleep(totalRunTimeInMilliseconds); - periodicTaskScheduler.stop(); - Assert.assertEquals(0, numOfProcessingMessages.get()); - } + public void testChangeLeadership() { + // Initial state + resetState(); + _task.init(); + assertFalse(_onBecomeLeaderCalled.get()); + assertFalse(_onBecomeNonLeaderCalled.get()); + assertFalse(_processCalled.get()); + + // From non-leader to non-leader + resetState(); + _task.run(); + assertFalse(_onBecomeLeaderCalled.get()); + assertFalse(_onBecomeNonLeaderCalled.get()); + assertFalse(_processCalled.get()); + + // From non-leader to leader + resetState(); + when(_resourceManager.isLeader()).thenReturn(true); + _task.run(); + assertTrue(_onBecomeLeaderCalled.get()); + assertFalse(_onBecomeNonLeaderCalled.get()); + assertTrue(_processCalled.get()); + + // From leader to leader + resetState(); + _task.run(); + assertFalse(_onBecomeLeaderCalled.get()); + assertFalse(_onBecomeNonLeaderCalled.get()); + assertTrue(_processCalled.get()); - private PeriodicTask createMockPeriodicTask(long runFrequencyInSeconds, long initialDelayInSeconds) { - return new ControllerPeriodicTask("Task", runFrequencyInSeconds, initialDelayInSeconds, helixResourceManager) { - public void init() { - numOfProcessingMessages.set(0); - } - - @Override - public void process(List<String> allTableNames) { - numOfProcessingMessages.incrementAndGet(); - } - }; + // From leader to non-leader + resetState(); + when(_resourceManager.isLeader()).thenReturn(false); + _task.run(); + assertFalse(_onBecomeLeaderCalled.get()); + assertTrue(_onBecomeNonLeaderCalled.get()); + assertFalse(_processCalled.get()); } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java index a1f4207..15ee475 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/BasePeriodicTask.java @@ -19,9 +19,9 @@ package com.linkedin.pinot.core.periodictask; * A base class to implement periodic task interface. */ public abstract class BasePeriodicTask implements PeriodicTask { - private final String _taskName; - private long _intervalInSeconds; - private long _initialDelayInSeconds; + protected final String _taskName; + protected final long _intervalInSeconds; + protected final long _initialDelayInSeconds; public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds) { _taskName = taskName; @@ -43,4 +43,10 @@ public abstract class BasePeriodicTask implements PeriodicTask { public String getTaskName() { return _taskName; } + + @Override + public String toString() { + return String.format("Task: %s, Interval: %ds, Initial Delay: %ds", _taskName, _intervalInSeconds, + _initialDelayInSeconds); + } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java index f58fffc..e5565de 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java @@ -15,6 +15,7 @@ */ package com.linkedin.pinot.core.periodictask; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -22,51 +23,55 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Periodic task scheduler will schedule a list of tasks based on their initial delay time and interval time. */ public class PeriodicTaskScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class); - private static final int CORE_POOL_SIZE = 5; - private final ScheduledExecutorService _executorService; - public PeriodicTaskScheduler() { - LOGGER.info("Initializing PeriodicTaskScheduler."); - _executorService = Executors.newScheduledThreadPool(CORE_POOL_SIZE); - } + private ScheduledExecutorService _executorService; /** * Start scheduling periodic tasks. */ public void start(List<PeriodicTask> periodicTasks) { - if (periodicTasks == null || periodicTasks.isEmpty()) { - LOGGER.warn("No periodic task assigned to scheduler!"); - return; + if (_executorService != null) { + LOGGER.warn("Periodic task scheduler already started"); } - if (periodicTasks.size() > CORE_POOL_SIZE) { - LOGGER.warn("The number of tasks:{} is more than the default number of threads:{}.", periodicTasks.size(), - CORE_POOL_SIZE); + List<PeriodicTask> tasksWithValidInterval = new ArrayList<>(); + for (PeriodicTask periodicTask : periodicTasks) { + if (periodicTask.getIntervalInSeconds() > 0) { + tasksWithValidInterval.add(periodicTask); + } } - LOGGER.info("Starting PeriodicTaskScheduler."); - // Set up an executor that executes tasks periodically - for (PeriodicTask periodicTask : periodicTasks) { - periodicTask.init(); - _executorService.scheduleWithFixedDelay(() -> { - try { - periodicTask.run(); - } catch (Throwable e) { - // catch all errors to prevent subsequent executions from being silently suppressed - // Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- - LOGGER.warn("Caught exception while running Task: {}", periodicTask.getTaskName(), e); - } - }, periodicTask.getInitialDelayInSeconds(), periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS); + if (tasksWithValidInterval.isEmpty()) { + LOGGER.warn("No periodic task scheduled"); + } else { + LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval); + _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size()); + for (PeriodicTask periodicTask : periodicTasks) { + periodicTask.init(); + _executorService.scheduleWithFixedDelay(() -> { + try { + periodicTask.run(); + } catch (Throwable e) { + // catch all errors to prevent subsequent executions from being silently suppressed + // Ref: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- + LOGGER.warn("Caught exception while running Task: {}", periodicTask.getTaskName(), e); + } + }, periodicTask.getInitialDelayInSeconds(), periodicTask.getIntervalInSeconds(), TimeUnit.SECONDS); + } } } public void stop() { - LOGGER.info("Stopping PeriodicTaskScheduler"); - _executorService.shutdown(); + if (_executorService != null) { + LOGGER.info("Stopping periodic task scheduler"); + _executorService.shutdown(); + _executorService = null; + } } } diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java index a2ff21e..78aad1c 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java @@ -16,171 +16,70 @@ package com.linkedin.pinot.core.periodictask; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.*; -public class PeriodicTaskSchedulerTest { - - @Test - public void testSchedulerWithOneTask() throws InterruptedException { - AtomicInteger count = new AtomicInteger(0); - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - long runFrequencyInSeconds = 1L; - long initialDelayInSeconds = 1L; - long totalRunTimeInMilliseconds = 3_500L; - - List<PeriodicTask> periodicTasks = new ArrayList<>(); - PeriodicTask task = new BasePeriodicTask("Task", runFrequencyInSeconds, initialDelayInSeconds) { - @Override - public void init() { - count.set(0); - } - - @Override - public void run() { - // Execute task. - count.incrementAndGet(); - } - }; - periodicTasks.add(task); - - long start = System.currentTimeMillis(); - periodicTaskScheduler.start(periodicTasks); - Thread.sleep(totalRunTimeInMilliseconds); - periodicTaskScheduler.stop(); - - Assert.assertTrue(count.get() > 0); - Assert.assertEquals(count.get(), (totalRunTimeInMilliseconds / (runFrequencyInSeconds * 1000))); - Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start)); - } +public class PeriodicTaskSchedulerTest { @Test - public void testSchedulerWithTwoStaggeredTasks() throws InterruptedException { - AtomicInteger count = new AtomicInteger(0); - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - long runFrequencyInSeconds = 2L; - long totalRunTimeInMilliseconds = 1_500L; - - List<PeriodicTask> periodicTasks = new ArrayList<>(); - PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 0L) { - @Override - public void init() { - } - - @Override - public void run() { - // Execute task. - count.incrementAndGet(); - } - }; - periodicTasks.add(task1); + public void testTaskWithInvalidInterval() throws Exception { + AtomicBoolean initCalled = new AtomicBoolean(); + AtomicBoolean runCalled = new AtomicBoolean(); - // Stagger 2 tasks by delaying the 2nd task half of the frequency. - PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds, runFrequencyInSeconds / 2) { + List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L, 0L) { @Override public void init() { + initCalled.set(true); } @Override public void run() { - // Execute task. - count.decrementAndGet(); + runCalled.set(true); } - }; - periodicTasks.add(task2); + }); - long start = System.currentTimeMillis(); - periodicTaskScheduler.start(periodicTasks); - Thread.sleep(totalRunTimeInMilliseconds); - periodicTaskScheduler.stop(); + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.start(periodicTasks); + Thread.sleep(100L); + taskScheduler.stop(); - Assert.assertEquals(count.get(), 0); - Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start)); + assertFalse(initCalled.get()); + assertFalse(runCalled.get()); } @Test - public void testSchedulerWithTwoTasksDifferentFrequencies() throws InterruptedException { - long startTime = System.currentTimeMillis(); - AtomicLong count = new AtomicLong(startTime); - AtomicLong count2 = new AtomicLong(startTime); - final long[] maxRunTimeForTask1 = {0L}; - final long[] maxRunTimeForTask2 = {0L}; - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - long runFrequencyInSeconds = 1L; - long totalRunTimeInMilliseconds = 10_000L; - - List<PeriodicTask> periodicTasks = new ArrayList<>(); - PeriodicTask task1 = new BasePeriodicTask("Task1", runFrequencyInSeconds, 0L) { - @Override - public void init() { - } - - @Override - public void run() { - // Calculate the max waiting time between the same task. - long lastTime = count.get(); - long now = System.currentTimeMillis(); - maxRunTimeForTask1[0] = Math.max(maxRunTimeForTask1[0], (now - lastTime)); - count.set(now); - } - }; - periodicTasks.add(task1); - - // The time for Task 2 to run is 4 seconds, which is higher than the interval time of Task 1. - long TimeToRunMs = 4_000L; - PeriodicTask task2 = new BasePeriodicTask("Task2", runFrequencyInSeconds * 3, 0L) { - @Override - public void init() { - } - - @Override - public void run() { - // Calculate the max waiting time between the same task. - long lastTime = count2.get(); - long now = System.currentTimeMillis(); - maxRunTimeForTask2[0] = Math.max(maxRunTimeForTask2[0], (now - lastTime)); - count2.set(now); - try { - Thread.sleep(TimeToRunMs); - } catch (InterruptedException e) { - e.printStackTrace(); + public void testScheduleMultipleTasks() throws Exception { + int numTasks = 3; + AtomicInteger numTimesInitCalled = new AtomicInteger(); + AtomicInteger numTimesRunCalled = new AtomicInteger(); + + List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks); + for (int i = 0; i < numTasks; i++) { + periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) { + @Override + public void init() { + numTimesInitCalled.getAndIncrement(); } - } - }; - periodicTasks.add(task2); - - periodicTaskScheduler.start(periodicTasks); - Thread.sleep(totalRunTimeInMilliseconds); - - periodicTaskScheduler.stop(); - Assert.assertTrue(count.get() > startTime); - Assert.assertTrue(count2.get() > startTime); - // Task1 didn't waited until Task2 finished. - Assert.assertTrue(maxRunTimeForTask1[0] - task1.getIntervalInSeconds() * 1000L < 100L); - Assert.assertTrue(maxRunTimeForTask2[0] >= Math.max(task2.getIntervalInSeconds() * 1000L, TimeToRunMs)); - } - - @Test - public void testNoTaskAssignedToQueue() throws InterruptedException { - AtomicInteger count = new AtomicInteger(0); - PeriodicTaskScheduler periodicTaskScheduler = new PeriodicTaskScheduler(); - long totalRunTimeInMilliseconds = 2_000L; - - // An empty list. - List<PeriodicTask> periodicTasks = new ArrayList<>(); - long start = System.currentTimeMillis(); - periodicTaskScheduler.start(periodicTasks); - Thread.sleep(totalRunTimeInMilliseconds); + @Override + public void run() { + numTimesRunCalled.getAndIncrement(); + } + }); + } - periodicTaskScheduler.stop(); + PeriodicTaskScheduler taskScheduler = new PeriodicTaskScheduler(); + taskScheduler.start(periodicTasks); + Thread.sleep(1100L); + taskScheduler.stop(); - Assert.assertEquals(count.get(), 0); - Assert.assertTrue(totalRunTimeInMilliseconds <= (System.currentTimeMillis() - start)); + assertEquals(numTimesInitCalled.get(), numTasks); + assertEquals(numTimesRunCalled.get(), numTasks * 2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
