This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch start_stop_periodic_tasks_on_leadership_changes in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 037cbc08ab9c9b86d0233311da0312d30b3462da Author: Neha Pawar <[email protected]> AuthorDate: Wed Dec 19 17:49:25 2018 -0800 Start and stop ControllerPeriodicTasks based on leadership changes --- .../pinot/controller/ControllerStarter.java | 9 ++- .../controller/helix/SegmentStatusChecker.java | 12 +-- .../helix/core/minion/PinotTaskManager.java | 25 +++--- .../core/periodictask/ControllerPeriodicTask.java | 94 +++++++++++----------- .../ControllerPeriodicTaskScheduler.java | 36 +++++++++ .../core/relocation/RealtimeSegmentRelocator.java | 5 ++ .../helix/core/retention/RetentionManager.java | 9 ++- .../controller/validation/ValidationManager.java | 12 +-- .../controller/helix/SegmentStatusCheckerTest.java | 33 +++----- .../periodictask/ControllerPeriodicTaskTest.java | 76 +++++++++-------- .../helix/core/retention/RetentionManagerTest.java | 16 +--- .../pinot/core/periodictask/PeriodicTask.java | 5 ++ .../core/periodictask/PeriodicTaskScheduler.java | 23 ++++-- .../periodictask/PeriodicTaskSchedulerTest.java | 14 ++++ 14 files changed, 209 insertions(+), 160 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java index 33fb5ce..1f3b316 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java @@ -32,6 +32,7 @@ import com.linkedin.pinot.controller.helix.SegmentStatusChecker; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import com.linkedin.pinot.controller.helix.core.minion.PinotTaskManager; +import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler; import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager; import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory; @@ -40,7 +41,6 @@ import com.linkedin.pinot.controller.helix.core.retention.RetentionManager; import com.linkedin.pinot.controller.validation.ValidationManager; import com.linkedin.pinot.core.crypt.PinotCrypterFactory; import com.linkedin.pinot.core.periodictask.PeriodicTask; -import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler; import com.linkedin.pinot.filesystem.PinotFSFactory; import com.yammer.metrics.core.MetricsRegistry; import java.io.File; @@ -80,7 +80,7 @@ public class ControllerStarter { private final PinotRealtimeSegmentManager _realtimeSegmentsManager; private final SegmentStatusChecker _segmentStatusChecker; private final ExecutorService _executorService; - private final PeriodicTaskScheduler _periodicTaskScheduler; + private final ControllerPeriodicTaskScheduler _periodicTaskScheduler; // Can only be constructed after resource manager getting started private ValidationManager _validationManager; @@ -101,7 +101,7 @@ public class ControllerStarter { Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build()); _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics); _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config); - _periodicTaskScheduler = new PeriodicTaskScheduler(); + _periodicTaskScheduler = new ControllerPeriodicTaskScheduler(); } public PinotHelixResourceManager getHelixResourceManager() { @@ -183,7 +183,8 @@ public class ControllerStarter { periodicTasks.add(_validationManager); periodicTasks.add(_segmentStatusChecker); periodicTasks.add(_realtimeSegmentRelocator); - _periodicTaskScheduler.start(periodicTasks); + _periodicTaskScheduler.init(periodicTasks); + LOGGER.info("Creating rebalance segments factory"); RebalanceSegmentStrategyFactory.createInstance(helixManager); diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java index 8059d9e..a1550f8 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java @@ -84,12 +84,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask { } @Override - public void onBecomeNotLeader() { - LOGGER.info("Resetting table metrics for all the tables."); - setStatusToDefault(); - } - - @Override protected void preprocess() { _realTimeTableCount = 0; _offlineTableCount = 0; @@ -267,4 +261,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask { _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE); _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE); } + + @Override + public void cleanup() { + LOGGER.info("Resetting table metrics for all the tables."); + setStatusToDefault(); + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java index 729c75c..76c030f 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -93,19 +93,6 @@ public class PinotTaskManager extends ControllerPeriodicTask { return getTasksScheduled(); } - /** - * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes. - */ - @Override - public void onBecomeNotLeader() { - LOGGER.info("Perform task cleanups."); - // Performs necessary cleanups for each task type. - for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) { - _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp(); - } - } - - @Override protected void preprocess() { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L); @@ -163,4 +150,16 @@ public class PinotTaskManager extends ControllerPeriodicTask { private Map<String, String> getTasksScheduled() { return _tasksScheduled; } + + /** + * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes. + */ + @Override + public void cleanup() { + LOGGER.info("Perform task cleanups."); + // Performs necessary cleanups for each task type. + for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) { + _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp(); + } + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index c416d0e..3a9d045 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -16,7 +16,6 @@ package com.linkedin.pinot.controller.helix.core.periodictask; import com.google.common.annotations.VisibleForTesting; -import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.core.periodictask.BasePeriodicTask; import java.util.List; @@ -36,9 +35,12 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120; public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300; + private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L; + protected final PinotHelixResourceManager _pinotHelixResourceManager; - private boolean _isLeader = false; + private volatile boolean _stopPeriodicTask = false; + private volatile boolean _periodicTaskInProgress = false; public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds, PinotHelixResourceManager pinotHelixResourceManager) { @@ -61,61 +63,61 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { @Override public void run() { - if (!isLeader()) { - skipLeaderTask(); - } else { - List<String> allTableNames = _pinotHelixResourceManager.getAllTables(); - processLeaderTask(allTableNames); - } - } - - private void skipLeaderTask() { - if (_isLeader) { - LOGGER.info("Current pinot controller lost leadership."); - _isLeader = false; - onBecomeNotLeader(); - } - LOGGER.info("Skip running periodic task: {} on non-leader controller", _taskName); - } - - private void processLeaderTask(List<String> tables) { - if (!_isLeader) { - LOGGER.info("Current pinot controller became leader. Starting {} with running frequency of {} seconds.", - _taskName, _intervalInSeconds); - _isLeader = true; - onBecomeLeader(); - } + _periodicTaskInProgress = true; + List<String> tableNamesWithType = _pinotHelixResourceManager.getAllTables(); long startTime = System.currentTimeMillis(); - int numTables = tables.size(); + int numTables = tableNamesWithType.size(); LOGGER.info("Start processing {} tables in periodic task: {}", numTables, _taskName); - process(tables); + process(tableNamesWithType); LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms", numTables, _taskName, (System.currentTimeMillis() - startTime)); + _periodicTaskInProgress = false; } - /** - * Does the following logic when losing the leadership. This should be done only once during leadership transition. - */ - public void onBecomeNotLeader() { - } - /** - * Does the following logic when becoming lead controller. This should be done only once during leadership transition. - */ - public void onBecomeLeader() { + @Override + public void stop() { + _stopPeriodicTask = true; + + LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis = {}", _taskName, + MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS); + long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS; + while (_periodicTaskInProgress && millisToWait > 0) { + try { + long thisWait = 1000; + if (millisToWait < thisWait) { + thisWait = millisToWait; + } + Thread.sleep(thisWait); + millisToWait -= thisWait; + } catch (InterruptedException e) { + LOGGER.info("Interrupted: Remaining wait time {} (out of {})", millisToWait, + MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS); + break; + } + } + LOGGER.info("Wait completed. _periodicTaskInProgress = {}", _periodicTaskInProgress); + + cleanup(); } + /** * Processes the task on the given tables. * - * @param tables List of table names + * @param tableNamesWithType List of table names */ - protected void process(List<String> tables) { - preprocess(); - for (String table : tables) { - processTable(table); + protected void process(List<String> tableNamesWithType) { + if (!isStopPeriodicTask()) { + preprocess(); + for (String table : tableNamesWithType) { + if (isStopPeriodicTask()) { + break; + } + processTable(table); + } + postprocess(); } - postprocess(); } /** @@ -135,7 +137,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { protected abstract void postprocess(); @VisibleForTesting - protected boolean isLeader() { - return ControllerLeadershipManager.getInstance().isLeader(); + protected boolean isStopPeriodicTask() { + return _stopPeriodicTask; } + + protected abstract void cleanup(); } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java new file mode 100644 index 0000000..15b18c0 --- /dev/null +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java @@ -0,0 +1,36 @@ +package com.linkedin.pinot.controller.helix.core.periodictask; + +import com.linkedin.pinot.controller.ControllerLeadershipManager; +import com.linkedin.pinot.controller.LeadershipChangeSubscriber; +import com.linkedin.pinot.core.periodictask.PeriodicTask; +import com.linkedin.pinot.core.periodictask.PeriodicTaskScheduler; +import java.util.List; + + +/** + * A {@link PeriodicTaskScheduler} for scheduling {@link ControllerPeriodicTask} which are created on controller startup + * and started/stopped on controller leadership changes + */ +public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler implements LeadershipChangeSubscriber { + + private List<PeriodicTask> _controllerPeriodicTasks; + + /** + * Initialize the {@link ControllerPeriodicTaskScheduler} with the {@link ControllerPeriodicTask} created at startup + * @param controllerPeriodicTasks + */ + public void init(List<PeriodicTask> controllerPeriodicTasks) { + _controllerPeriodicTasks = controllerPeriodicTasks; + ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this); + } + + @Override + public void onBecomingLeader() { + start(_controllerPeriodicTasks); + } + + @Override + public void onBecomingNonLeader() { + stop(); + } +} diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java index e324c11..90c9435 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java @@ -269,4 +269,9 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { } return seconds; } + + @Override + public void cleanup() { + + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java index 80894fc..5127835 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java @@ -53,10 +53,7 @@ public class RetentionManager extends ControllerPeriodicTask { int deletedSegmentsRetentionInDays) { super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager); _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays; - } - @Override - public void onBecomeLeader() { LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}", getIntervalInSeconds(), _deletedSegmentsRetentionInDays); } @@ -185,4 +182,10 @@ public class RetentionManager extends ControllerPeriodicTask { CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE); } } + + + @Override + public void cleanup() { + + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java index 18025eb..79a9d1b 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java @@ -70,12 +70,6 @@ public class ValidationManager extends ControllerPeriodicTask { } @Override - public void onBecomeNotLeader() { - LOGGER.info("Unregister all the validation metrics."); - _validationMetrics.unregisterAllMetrics(); - } - - @Override protected void preprocess() { // Run segment level validation using a separate interval _runSegmentLevelValidation = false; @@ -312,4 +306,10 @@ public class ValidationManager extends ControllerPeriodicTask { return numTotalDocs; } + + @Override + public void cleanup() { + LOGGER.info("Unregister all the validation metrics."); + _validationMetrics.unregisterAllMetrics(); + } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java index 4d4c324..1246991 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -86,7 +86,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -151,7 +151,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -228,7 +228,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -274,7 +274,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -308,7 +308,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -376,7 +376,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -420,7 +420,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -458,7 +458,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -504,7 +504,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); // verify state before test Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge( ControllerGauge.DISABLED_TABLE_COUNT), 0); @@ -555,7 +555,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -567,17 +567,4 @@ public class SegmentStatusCheckerTest { Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } - - private class MockSegmentStatusChecker extends SegmentStatusChecker { - - public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, - ControllerMetrics metricsRegistry) { - super(pinotHelixResourceManager, config, metricsRegistry); - } - - @Override - protected boolean isLeader() { - return true; - } - } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 94c9f52..c4903f7 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -28,33 +28,32 @@ public class ControllerPeriodicTaskTest { private static final long RUN_FREQUENCY_IN_SECONDS = 30; private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class); - private final AtomicBoolean _onBecomeLeaderCalled = new AtomicBoolean(); - private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean(); + private final AtomicBoolean _cleanupCalled = new AtomicBoolean(); private final AtomicBoolean _processCalled = new AtomicBoolean(); + private final AtomicBoolean _processTableCalled = new AtomicBoolean(); private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) { - @Override - public void onBecomeLeader() { - _onBecomeLeaderCalled.set(true); - } @Override - public void onBecomeNotLeader() { - _onBecomeNonLeaderCalled.set(true); + public void cleanup() { + _cleanupCalled.set(true); } @Override - public void process(List<String> tables) { + public void process(List<String> tableNamesWithType) { _processCalled.set(true); } + @Override + public void processTable(String tableNameWithType) { _processTableCalled.set(true);} + }; private void resetState() { - _onBecomeLeaderCalled.set(false); - _onBecomeNonLeaderCalled.set(false); + _cleanupCalled.set(false); _processCalled.set(false); + _processTableCalled.set(false); } @Test @@ -66,56 +65,48 @@ public class ControllerPeriodicTaskTest { } @Test - public void testChangeLeadership() { + public void testControllerPeriodicTaskCalls() { // Initial state resetState(); - _task.setLeader(false); _task.init(); - assertFalse(_onBecomeLeaderCalled.get()); - assertFalse(_onBecomeNonLeaderCalled.get()); - assertFalse(_processCalled.get()); - - // From non-leader to non-leader - resetState(); - _task.run(); - assertFalse(_onBecomeLeaderCalled.get()); - assertFalse(_onBecomeNonLeaderCalled.get()); + assertFalse(_cleanupCalled.get()); assertFalse(_processCalled.get()); + assertFalse(_processTableCalled.get()); - // From non-leader to leader + // run task resetState(); - _task.setLeader(true); _task.run(); - assertTrue(_onBecomeLeaderCalled.get()); - assertFalse(_onBecomeNonLeaderCalled.get()); + assertFalse(_cleanupCalled.get()); assertTrue(_processCalled.get()); + assertFalse(_processTableCalled.get()); - // From leader to leader + // stop periodic task flag set, task will not run resetState(); + _task.setStopPeriodicTask(true); _task.run(); - assertFalse(_onBecomeLeaderCalled.get()); - assertFalse(_onBecomeNonLeaderCalled.get()); + assertFalse(_cleanupCalled.get()); assertTrue(_processCalled.get()); + assertFalse(_processTableCalled.get()); - // From leader to non-leader + // stop periodic task resetState(); - _task.setLeader(false); - _task.run(); - assertFalse(_onBecomeLeaderCalled.get()); - assertTrue(_onBecomeNonLeaderCalled.get()); + _task.stop(); + assertTrue(_cleanupCalled.get()); assertFalse(_processCalled.get()); + assertFalse(_processTableCalled.get()); + } private class MockControllerPeriodicTask extends ControllerPeriodicTask { - private boolean _isLeader = true; + private boolean _isStopPeriodicTask = false; public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, PinotHelixResourceManager pinotHelixResourceManager) { super(taskName, runFrequencyInSeconds, pinotHelixResourceManager); } @Override - protected void process(List<String> tables) { + protected void process(List<String> tableNamesWithType) { } @@ -135,12 +126,17 @@ public class ControllerPeriodicTaskTest { } @Override - protected boolean isLeader() { - return _isLeader; + protected boolean isStopPeriodicTask() { + return _isStopPeriodicTask; } - void setLeader(boolean isLeader) { - _isLeader = isLeader; + void setStopPeriodicTask(boolean isStopPeriodicTask) { + _isStopPeriodicTask = isStopPeriodicTask; + } + + @Override + public void cleanup() { + } } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java index 8e25505..c27b0ef 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -84,7 +84,7 @@ public class RetentionManagerTest { when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList); - RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0); + RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); retentionManager.init(); retentionManager.run(); @@ -201,7 +201,7 @@ public class RetentionManagerTest { setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager); - RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0); + RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); retentionManager.init(); retentionManager.run(); @@ -306,16 +306,4 @@ public class RetentionManagerTest { return segmentMetadata; } - private class MockRetentionManager extends RetentionManager { - - public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds, - int deletedSegmentsRetentionInDays) { - super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays); - } - - @Override - protected boolean isLeader() { - return true; - } - } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java index fac0750..f5b9c60 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTask.java @@ -43,4 +43,9 @@ public interface PeriodicTask extends Runnable { * @return task name. */ String getTaskName(); + + /** + * Stop the periodic task + */ + void stop(); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java index cd07ea4..81c92a0 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/periodictask/PeriodicTaskScheduler.java @@ -31,6 +31,7 @@ public class PeriodicTaskScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicTaskScheduler.class); private ScheduledExecutorService _executorService; + private List<PeriodicTask> _tasksWithValidInterval; /** * Start scheduling periodic tasks. @@ -40,25 +41,27 @@ public class PeriodicTaskScheduler { LOGGER.warn("Periodic task scheduler already started"); } - List<PeriodicTask> tasksWithValidInterval = new ArrayList<>(); + _tasksWithValidInterval = new ArrayList<>(); for (PeriodicTask periodicTask : periodicTasks) { if (periodicTask.getIntervalInSeconds() > 0) { LOGGER.info("Adding periodic task: {}", periodicTask); - tasksWithValidInterval.add(periodicTask); + _tasksWithValidInterval.add(periodicTask); } else { LOGGER.info("Skipping periodic task: {}", periodicTask); } } - if (tasksWithValidInterval.isEmpty()) { + if (_tasksWithValidInterval.isEmpty()) { LOGGER.warn("No periodic task scheduled"); } else { - LOGGER.info("Starting periodic task scheduler with tasks: {}", tasksWithValidInterval); - _executorService = Executors.newScheduledThreadPool(tasksWithValidInterval.size()); - for (PeriodicTask periodicTask : tasksWithValidInterval) { + LOGGER.info("Starting periodic task scheduler with tasks: {}", _tasksWithValidInterval); + _executorService = Executors.newScheduledThreadPool(_tasksWithValidInterval.size()); + for (PeriodicTask periodicTask : _tasksWithValidInterval) { periodicTask.init(); _executorService.scheduleWithFixedDelay(() -> { try { + LOGGER.info("Starting {} with running frequency of {} seconds.", periodicTask.getTaskName(), + periodicTask.getIntervalInSeconds()); periodicTask.run(); } catch (Throwable e) { // catch all errors to prevent subsequent executions from being silently suppressed @@ -70,11 +73,19 @@ public class PeriodicTaskScheduler { } } + /** + * Shutdown executor service and stop the periodic tasks + */ public void stop() { if (_executorService != null) { LOGGER.info("Stopping periodic task scheduler"); _executorService.shutdown(); _executorService = null; } + + if (_tasksWithValidInterval != null) { + LOGGER.info("Stopping all periodic tasks: {}", _tasksWithValidInterval); + _tasksWithValidInterval.parallelStream().forEach(PeriodicTask::stop); + } } } diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java index a105435..fbb3d92 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/periodictask/PeriodicTaskSchedulerTest.java @@ -31,6 +31,7 @@ public class PeriodicTaskSchedulerTest { public void testTaskWithInvalidInterval() throws Exception { AtomicBoolean initCalled = new AtomicBoolean(); AtomicBoolean runCalled = new AtomicBoolean(); + AtomicBoolean stopCalled = new AtomicBoolean(); List<PeriodicTask> periodicTasks = Collections.singletonList(new BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) { @Override @@ -39,6 +40,11 @@ public class PeriodicTaskSchedulerTest { } @Override + public void stop() { + stopCalled.set(true); + } + + @Override public void run() { runCalled.set(true); } @@ -51,6 +57,7 @@ public class PeriodicTaskSchedulerTest { assertFalse(initCalled.get()); assertFalse(runCalled.get()); + assertFalse(stopCalled.get()); } @Test @@ -58,6 +65,7 @@ public class PeriodicTaskSchedulerTest { int numTasks = 3; AtomicInteger numTimesInitCalled = new AtomicInteger(); AtomicInteger numTimesRunCalled = new AtomicInteger(); + AtomicInteger numTimesStopCalled = new AtomicInteger(); List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks); for (int i = 0; i < numTasks; i++) { @@ -68,6 +76,11 @@ public class PeriodicTaskSchedulerTest { } @Override + public void stop() { + numTimesStopCalled.getAndIncrement(); + } + + @Override public void run() { numTimesRunCalled.getAndIncrement(); } @@ -81,5 +94,6 @@ public class PeriodicTaskSchedulerTest { assertEquals(numTimesInitCalled.get(), numTasks); assertEquals(numTimesRunCalled.get(), numTasks * 2); + assertEquals(numTimesStopCalled.get(), numTasks); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
