This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2fb6371 Refactor periodic task (#3819)
2fb6371 is described below
commit 2fb63714dec6bc18d2c7fe77cdf82dd88d113440
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Feb 12 17:34:33 2019 -0800
Refactor periodic task (#3819)
Refactor periodic task to fix the following issues:
- PinotTaskManager.scheduleTasks() has no effect on non-leader controller,
but returns result from previous run
The reason for this is that, we store states for each run as member
variables in the periodic task, which mixed the concept of task and run
- After calling stop() (lose leadership), when re-gaining leadership, no
start() method is called to set up the environment
- Potential race condition between start() and stop()
Replace init() with start() and call it whenever the leadership is gained
Move the basic control methods for periodic task (start(), run(), stop())
into BasePeriodicTask
Keep table level methods in ControllerPeriodicTask
For per-run states, add context generic type to pass them between
processing multiple tables
---
.../pinot/common/metrics/ControllerMeter.java | 2 +
.../controller/helix/SegmentStatusChecker.java | 93 ++++++------
.../helix/core/minion/PinotTaskManager.java | 123 +++++++--------
.../core/periodictask/ControllerPeriodicTask.java | 167 +++++++--------------
.../core/relocation/RealtimeSegmentRelocator.java | 29 +---
.../helix/core/retention/RetentionManager.java | 21 +--
.../BrokerResourceValidationManager.java | 32 ++--
.../validation/OfflineSegmentIntervalChecker.java | 22 +--
.../RealtimeSegmentValidationManager.java | 34 ++---
.../controller/helix/SegmentStatusCheckerTest.java | 20 +--
.../periodictask/ControllerPeriodicTaskTest.java | 114 ++++++--------
.../helix/core/retention/RetentionManagerTest.java | 4 +-
.../pinot/core/periodictask/BasePeriodicTask.java | 144 +++++++++++++++++-
.../pinot/core/periodictask/PeriodicTask.java | 29 +++-
.../core/periodictask/PeriodicTaskScheduler.java | 9 +-
.../periodictask/PeriodicTaskSchedulerTest.java | 34 ++---
.../tests/SegmentCompletionIntegrationTests.java | 2 +-
17 files changed, 431 insertions(+), 448 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 9d522bc..9f9299e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -56,6 +56,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
// Introducing a new stream agnostic metric to replace LLC_KAFKA_DATA_LOSS.
// We can phase out LLC_KAFKA_DATA_LOSS once we have collected sufficient
metrics for the new one
LLC_STREAM_DATA_LOSS("dataLoss", false),
+ CONTROLLER_PERIODIC_TASK_RUN("periodicTaskRun", false),
+ CONTROLLER_PERIODIC_TASK_ERROR("periodicTaskError", false),
NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index cbe763c..f40e2b3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
* Manages the segment status metrics, regarding tables with fewer replicas
than requested
* and segments in error state.
*/
-public class SegmentStatusChecker extends ControllerPeriodicTask {
+public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusChecker.Context> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentStatusChecker.class);
private static final int MaxOfflineSegmentsToLog = 5;
public static final String ONLINE = "ONLINE";
@@ -50,10 +50,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
// log messages about disabled tables atmost once a day
private static final long DISABLED_TABLE_LOG_INTERVAL_MS =
TimeUnit.DAYS.toMillis(1);
private long _lastDisabledTableLogTimestamp = 0;
- private boolean _logDisabledTables;
- private int _realTimeTableCount;
- private int _offlineTableCount;
- private int _disabledTableCount;
/**
* Constructs the segment status checker.
@@ -69,58 +65,50 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
}
@Override
- public void initTask() {
+ protected void setUpTask() {
LOGGER.info("Initializing table metrics for all the tables.");
setStatusToDefault();
}
@Override
- protected void preprocess() {
- _realTimeTableCount = 0;
- _offlineTableCount = 0;
- _disabledTableCount = 0;
-
+ protected Context preprocess() {
+ Context context = new Context();
// check if we need to log disabled tables log messages
long now = System.currentTimeMillis();
if (now - _lastDisabledTableLogTimestamp >=
DISABLED_TABLE_LOG_INTERVAL_MS) {
- _logDisabledTables = true;
+ context._logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
- } else {
- _logDisabledTables = false;
}
+ return context;
}
@Override
- protected void processTable(String tableNameWithType) {
- updateSegmentMetrics(tableNameWithType);
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
- // Remove the metric for this table
- resetTableMetrics(tableNameWithType);
+ protected void processTable(String tableNameWithType, Context context) {
+ try {
+ updateSegmentMetrics(tableNameWithType, context);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
+ }
}
@Override
- protected void postprocess() {
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT,
_realTimeTableCount);
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT,
_offlineTableCount);
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT,
_disabledTableCount);
+ protected void postprocess(Context context) {
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT,
context._realTimeTableCount);
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT,
context._offlineTableCount);
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT,
context._disabledTableCount);
}
/**
* Runs a segment status pass over the given table.
* TODO: revisit the logic and reduce the ZK access
- *
- * @param tableNameWithType
*/
- private void updateSegmentMetrics(String tableNameWithType) {
-
+ private void updateSegmentMetrics(String tableNameWithType, Context context)
{
if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE) {
- _offlineTableCount++;
+ context._offlineTableCount++;
} else {
- _realTimeTableCount++;
+ context._realTimeTableCount++;
}
IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
@@ -132,11 +120,11 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
}
if (!idealState.isEnabled()) {
- if (_logDisabledTables) {
+ if (context._logDisabledTables) {
LOGGER.warn("Table {} is disabled. Skipping segment status checks",
tableNameWithType);
}
resetTableMetrics(tableNameWithType);
- _disabledTableCount++;
+ context._disabledTableCount++;
return;
}
@@ -147,16 +135,16 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
} catch (NumberFormatException e) {
// Ignore
}
- _metricsRegistry
+ _controllerMetrics
.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS, 100);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
return;
}
- _metricsRegistry
+ _controllerMetrics
.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT,
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT,
(long) (idealState.getPartitionSet().size()));
ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
@@ -230,11 +218,11 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
}
// Synchronization provided by Controller Gauge to make sure that only one
thread updates the gauge
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS,
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS,
(nReplicasIdealMax > 0) ? (nReplicasExternal * 100 /
nReplicasIdealMax) : 100);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
(nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
if (nOffline > 0) {
LOGGER.warn("Table {} has {} segments with no online replicas",
tableNameWithType, nOffline);
@@ -254,15 +242,22 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
}
private void resetTableMetrics(String tableName) {
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS, Long.MIN_VALUE);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS, Long.MIN_VALUE);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
+ _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS, Long.MIN_VALUE);
+ _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS, Long.MIN_VALUE);
+ _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, Long.MIN_VALUE);
+ _controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, Long.MIN_VALUE);
}
@Override
- public void stopTask() {
+ public void cleanUpTask() {
LOGGER.info("Resetting table metrics for all the tables.");
setStatusToDefault();
}
+
+ public static final class Context {
+ private boolean _logDisabledTables;
+ private int _realTimeTableCount;
+ private int _offlineTableCount;
+ private int _disabledTableCount;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index a674fe0..6bb5c9d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
import org.apache.pinot.common.config.PinotTaskConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableTaskConfig;
@@ -44,21 +43,16 @@ import org.slf4j.LoggerFactory;
* <p><code>PinotTaskManager</code> is also responsible for checking the
health status on each type of tasks, detect and
* fix issues accordingly.
*/
-public class PinotTaskManager extends ControllerPeriodicTask {
+public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTaskManager.class);
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoProvider _clusterInfoProvider;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
- private Map<String, List<TableConfig>> _enabledTableConfigMap;
- private Set<String> _taskTypes;
- private int _numTaskTypes;
- private Map<String, String> _tasksScheduled;
-
- public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager
helixTaskResourceManager,
- @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull
ControllerConf controllerConf,
- @Nonnull ControllerMetrics controllerMetrics) {
+ public PinotTaskManager(PinotHelixTaskResourceManager
helixTaskResourceManager,
+ PinotHelixResourceManager helixResourceManager, ControllerConf
controllerConf,
+ ControllerMetrics controllerMetrics) {
super("PinotTaskManager",
controllerConf.getTaskManagerFrequencyInSeconds(),
controllerConf.getPeriodicTaskInitialDelayInSeconds(),
helixResourceManager, controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
@@ -66,110 +60,105 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
}
- @Override
- protected void initTask() {
-
- }
-
/**
- * Get the cluster info provider.
- * <p>Cluster info provider might be needed to initialize task generators.
+ * Returns the cluster info provider.
+ * <p>
+ * Cluster info provider might be useful when initializing task generators.
*
* @return Cluster info provider
*/
- @Nonnull
public ClusterInfoProvider getClusterInfoProvider() {
return _clusterInfoProvider;
}
/**
- * Register a task generator.
- * <p>This is for pluggable task generators.
+ * Registers a task generator.
+ * <p>
+ * This method can be used to plug in custom task generators.
*
* @param pinotTaskGenerator Task generator to be registered
*/
- public void registerTaskGenerator(@Nonnull PinotTaskGenerator
pinotTaskGenerator) {
+ public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
_taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
}
/**
* Public API to schedule tasks. It doesn't matter whether current pinot
controller is leader.
*/
- public Map<String, String> scheduleTasks() {
- process(_pinotHelixResourceManager.getAllTables());
- return getTasksScheduled();
+ public synchronized Map<String, String> scheduleTasks() {
+ Map<String, String> tasksScheduled =
scheduleTasks(_pinotHelixResourceManager.getAllTables());
+
+ // NOTE: this method might be called from the Rest API instead of the
periodic task scheduler on non-leader
+ // controllers, so if the task is stopped (non-leader controller), clean
up the task
+ if (!isStarted()) {
+ cleanUpTask();
+ }
+
+ return tasksScheduled;
}
- @Override
- protected void preprocess() {
-
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
+ /**
+ * Check the Pinot cluster status and schedule new tasks for the given
tables.
+ *
+ * @param tableNamesWithType List of table names with type suffix
+ * @return Map from task type to task scheduled
+ */
+ private Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
- _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
- _numTaskTypes = _taskTypes.size();
- _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
+ Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+ int numTaskTypes = taskTypes.size();
+ Map<String, List<TableConfig>> enabledTableConfigMap = new
HashMap<>(numTaskTypes);
- for (String taskType : _taskTypes) {
- _enabledTableConfigMap.put(taskType, new ArrayList<>());
+ for (String taskType : taskTypes) {
+ enabledTableConfigMap.put(taskType, new ArrayList<>());
// Ensure all task queues exist
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
}
- }
- @Override
- protected void processTable(String tableNameWithType) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null) {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig != null) {
- for (String taskType : _taskTypes) {
- if (taskConfig.isTaskTypeEnabled(taskType)) {
- _enabledTableConfigMap.get(taskType).add(tableConfig);
+ // Scan all table configs to get the tables with tasks enabled
+ for (String tableNameWithType : tableNamesWithType) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null) {
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig != null) {
+ for (String taskType : taskTypes) {
+ if (taskConfig.isTaskTypeEnabled(taskType)) {
+ enabledTableConfigMap.get(taskType).add(tableConfig);
+ }
}
}
}
}
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Exception in PinotTaskManager for table {}",
tableNameWithType, e);
- }
- @Override
- protected void postprocess() {
// Generate each type of tasks
- _tasksScheduled = new HashMap<>(_numTaskTypes);
- for (String taskType : _taskTypes) {
+ Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
+ for (String taskType : taskTypes) {
LOGGER.info("Generating tasks for task type: {}", taskType);
PinotTaskGenerator pinotTaskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
+ List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
int numTasks = pinotTaskConfigs.size();
if (numTasks > 0) {
LOGGER
.info("Submitting {} tasks for task type: {} with task configs:
{}", numTasks, taskType, pinotTaskConfigs);
- _tasksScheduled.put(taskType, _helixTaskResourceManager
+ tasksScheduled.put(taskType, _helixTaskResourceManager
.submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
- _metricsRegistry.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+ _controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
+
+ return tasksScheduled;
}
- /**
- * Returns the tasks that have been scheduled as part of the postprocess
- * @return
- */
- private Map<String, String> getTasksScheduled() {
- return _tasksScheduled;
+ @Override
+ protected void processTables(List<String> tableNamesWithType) {
+ scheduleTasks(tableNamesWithType);
}
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller
leadership changes.
- */
@Override
- public void stopTask() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
+ public void cleanUpTask() {
+ LOGGER.info("Cleaning up all task generators");
for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
_taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 8d15d1e..7e764f3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -18,9 +18,10 @@
*/
package org.apache.pinot.controller.helix.core.periodictask;
-import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
@@ -31,152 +32,96 @@ import org.slf4j.LoggerFactory;
/**
* The base periodic task for pinot controller only. It uses
<code>PinotHelixResourceManager</code> to determine
* which table resources should be managed by this Pinot controller.
+ *
+ * @param <C> the context type
*/
-public abstract class ControllerPeriodicTask extends BasePeriodicTask {
+@ThreadSafe
+public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerPeriodicTask.class);
- private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS =
30_000L;
-
protected final PinotHelixResourceManager _pinotHelixResourceManager;
- protected final ControllerMetrics _metricsRegistry;
-
- private volatile boolean _stopPeriodicTask;
- private volatile boolean _periodicTaskInProgress;
+ protected final ControllerMetrics _controllerMetrics;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics
controllerMetrics) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
_pinotHelixResourceManager = pinotHelixResourceManager;
- _metricsRegistry = controllerMetrics;
+ _controllerMetrics = controllerMetrics;
}
- /**
- * Reset flags, and call initTask which initializes each individual task
- */
@Override
- public final void init() {
- _stopPeriodicTask = false;
- _periodicTaskInProgress = false;
- initTask();
+ protected final void runTask() {
+ _controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
+ try {
+ processTables(_pinotHelixResourceManager.getAllTables());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while running task: {}", _taskName, e);
+ _controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
+ }
}
/**
- * Execute the ControllerPeriodicTask.
- * The _periodicTaskInProgress is enabled at the beginning and disabled
before exiting,
- * to ensure that we can wait for a task in progress to finish when stop has
been invoked
+ * Processes the given list of tables, and returns the number of tables
processed.
+ * <p>
+ * Override one of this method, {@link #processTable(String)} or {@link
#processTable(String, C)}.
*/
- @Override
- public final void run() {
- _stopPeriodicTask = false;
- _periodicTaskInProgress = true;
-
- List<String> tableNamesWithType =
_pinotHelixResourceManager.getAllTables();
- long startTime = System.currentTimeMillis();
+ protected void processTables(List<String> tableNamesWithType) {
int numTables = tableNamesWithType.size();
-
- LOGGER.info("Start processing {} tables in periodic task: {}", numTables,
getTaskName());
- process(tableNamesWithType);
- LOGGER.info("Finish processing {} tables in periodic task: {} in {}ms",
numTables, getTaskName(),
- (System.currentTimeMillis() - startTime));
-
- _periodicTaskInProgress = false;
- }
-
- /**
- * Stops the ControllerPeriodicTask by enabling the _stopPeriodicTask flag.
The flag ensures that processing of no new table begins.
- * This method waits for the in progress ControllerPeriodicTask to finish
the table being processed, until MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS
- * Finally, it invokes the stopTask for any specific cleanup at the
individual task level
- */
- @Override
- public final void stop() {
- _stopPeriodicTask = true;
-
- LOGGER.info("Waiting for periodic task {} to finish, maxWaitTimeMillis =
{}", getTaskName(),
- MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS);
- long millisToWait = MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS;
- while (_periodicTaskInProgress && millisToWait > 0) {
- try {
- long thisWait = 1000;
- if (millisToWait < thisWait) {
- thisWait = millisToWait;
- }
- Thread.sleep(thisWait);
- millisToWait -= thisWait;
- } catch (InterruptedException e) {
- LOGGER.info("Interrupted: Remaining wait time {} (out of {}) for task
{}", millisToWait,
- MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS, getTaskName());
+ LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+ C context = preprocess();
+ int numTablesProcessed = 0;
+ for (String tableNameWithType : tableNamesWithType) {
+ if (!isStarted()) {
+ LOGGER.info("Task: {} is stopped, early terminate the task",
_taskName);
break;
}
- }
- LOGGER.info("Wait completed for task {}. Waited for {} ms.
_periodicTaskInProgress = {}", getTaskName(),
- MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS - millisToWait,
_periodicTaskInProgress);
-
- stopTask();
- }
-
- /**
- * Processes the task on the given tables.
- *
- * @param tableNamesWithType List of table names
- */
- protected void process(List<String> tableNamesWithType) {
- if (!shouldStopPeriodicTask()) {
-
- int numTablesProcessed = 0;
- preprocess();
-
- for (String tableNameWithType : tableNamesWithType) {
- if (shouldStopPeriodicTask()) {
- LOGGER.info("Skip processing table {} and all the remaining tables
for task {}.", tableNameWithType,
- getTaskName());
- break;
- }
- try {
- processTable(tableNameWithType);
- numTablesProcessed++;
- } catch (Exception e) {
- exceptionHandler(tableNameWithType, e);
- }
+ try {
+ processTable(tableNameWithType, context);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing table: {} in task:
{}", tableNamesWithType, _taskName, e);
}
-
- postprocess();
- _metricsRegistry
-
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
getTaskName(), numTablesProcessed);
- } else {
- LOGGER.info("Skip processing all tables for task {}", getTaskName());
+ numTablesProcessed++;
}
+ postprocess(context);
+ _controllerMetrics
+
.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
_taskName, numTablesProcessed);
+ LOGGER.info("Finish processing {}/{} tables in task: {}",
numTablesProcessed, numTables, _taskName);
}
/**
- * This method runs before processing all tables
+ * Can be overridden to provide context before processing the tables.
*/
- protected abstract void preprocess();
+ protected C preprocess() {
+ return null;
+ }
/**
- * Execute the controller periodic task for the given table
- * @param tableNameWithType
+ * Processes the given table.
+ * <p>
+ * Override one of this method, {@link #processTable(String)} or {@link
#processTables(List)}.
*/
- protected abstract void processTable(String tableNameWithType);
+ protected void processTable(String tableNameWithType, C context) {
+ processTable(tableNameWithType);
+ }
/**
- * This method runs after processing all tables
+ * Processes the given table.
+ * <p>
+ * Override one of this method, {@link #processTable(String, C)} or {@link
#processTables(List)}.
*/
- protected abstract void postprocess();
-
- protected abstract void exceptionHandler(String tableNameWithType, Exception
e);
-
- @VisibleForTesting
- protected boolean shouldStopPeriodicTask() {
- return _stopPeriodicTask;
+ protected void processTable(String tableNameWithType) {
}
/**
- * Initialize the ControllerPeriodicTask, to be defined by each individual
task
+ * Can be overridden to perform cleanups after processing the tables.
*/
- protected abstract void initTask();
+ protected void postprocess(C context) {
+ postprocess();
+ }
/**
- * Perform cleanup for the ControllerPeriodicTask, to be defined by each
individual task
+ * Can be overridden to perform cleanups after processing the tables.
*/
- protected abstract void stopTask();
+ protected void postprocess() {
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 203b245..0a9e352 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.common.utils.time.TimeUtils;
@@ -53,7 +52,7 @@ import org.slf4j.LoggerFactory;
* We only relocate segments for realtime tables, and only if tenant config
indicates that relocation is required
* A segment will be relocated, one replica at a time, once all of its
replicas are in ONLINE state and all/some are on servers other than completed
servers
*/
-public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
+public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
public RealtimeSegmentRelocator(PinotHelixResourceManager
pinotHelixResourceManager, ControllerConf config,
@@ -63,31 +62,12 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
@Override
- protected void initTask() {
-
- }
-
- @Override
- protected void preprocess() {
- }
-
- @Override
protected void processTable(String tableNameWithType) {
- CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+ if (TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
runRelocation(tableNameWithType);
}
}
- @Override
- protected void postprocess() {
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}",
tableNameWithType, e);
- }
-
/**
* Check the given table. Perform relocation of segments if table is
realtime and relocation is required
* TODO: Model this to implement {@link
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy}
interface
@@ -277,9 +257,4 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
return seconds;
}
-
- @Override
- public void stopTask() {
-
- }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b8059e9..b3abf07 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
* The <code>RetentionManager</code> class manages retention for all segments
and delete expired segments.
* <p>It is scheduled to run only on leader controller.
*/
-public class RetentionManager extends ControllerPeriodicTask {
+public class RetentionManager extends ControllerPeriodicTask<Void> {
public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS =
TimeUnit.DAYS.toMillis(5L);
private static final Logger LOGGER =
LoggerFactory.getLogger(RetentionManager.class);
@@ -65,15 +65,6 @@ public class RetentionManager extends ControllerPeriodicTask
{
}
@Override
- protected void initTask() {
-
- }
-
- @Override
- protected void preprocess() {
- }
-
- @Override
protected void processTable(String tableNameWithType) {
LOGGER.info("Start managing retention for table: {}", tableNameWithType);
manageRetentionForTable(tableNameWithType);
@@ -85,11 +76,6 @@ public class RetentionManager extends ControllerPeriodicTask
{
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
}
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}",
tableNameWithType, e);
- }
-
private void manageRetentionForTable(String tableNameWithType) {
// Build retention strategy from table config
@@ -190,9 +176,4 @@ public class RetentionManager extends
ControllerPeriodicTask {
.contains(CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel.OFFLINE);
}
}
-
- @Override
- public void stopTask() {
-
- }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 85dd917..8f7e1c0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -33,11 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* Rebuilds the broker resource if the instance set has changed
*/
-public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+public class BrokerResourceValidationManager extends
ControllerPeriodicTask<BrokerResourceValidationManager.Context> {
private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerResourceValidationManager.class);
- private List<InstanceConfig> _instanceConfigs;
-
public BrokerResourceValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
ControllerMetrics controllerMetrics) {
super("BrokerResourceValidationManager",
config.getBrokerResourceValidationFrequencyInSeconds(),
@@ -45,12 +43,14 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask {
}
@Override
- protected void preprocess() {
- _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ protected Context preprocess() {
+ Context context = new Context();
+ context._instanceConfigs =
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ return context;
}
@Override
- protected void processTable(String tableNameWithType) {
+ protected void processTable(String tableNameWithType, Context context) {
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
LOGGER.warn("Failed to find table config for table: {}, skipping broker
resource validation", tableNameWithType);
@@ -59,25 +59,11 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask {
// Rebuild broker resource
Set<String> brokerInstances = _pinotHelixResourceManager
- .getAllInstancesForBrokerTenant(_instanceConfigs,
tableConfig.getTenantConfig().getBroker());
+ .getAllInstancesForBrokerTenant(context._instanceConfigs,
tableConfig.getTenantConfig().getBroker());
_pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
}
- @Override
- protected void postprocess() {
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Caught exception while validating broker resource for table:
{}", tableNameWithType, e);
- }
-
- @Override
- protected void initTask() {
-
- }
-
- @Override
- public void stopTask() {
+ public static final class Context {
+ private List<InstanceConfig> _instanceConfigs;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 3577813..2646ddd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
* Manages the segment validation metrics, to ensure that all offline segments
are contiguous (no missing segments) and
* that the offline push delay isn't too high.
*/
-public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
private final ValidationMetrics _validationMetrics;
@@ -56,10 +56,6 @@ public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask {
}
@Override
- protected void preprocess() {
- }
-
- @Override
protected void processTable(String tableNameWithType) {
CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
@@ -206,21 +202,7 @@ public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask {
}
@Override
- protected void postprocess() {
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.warn("Caught exception while checking offline segment intervals for
table: {}", tableNameWithType, e);
- }
-
- @Override
- protected void initTask() {
-
- }
-
- @Override
- public void stopTask() {
+ public void cleanUpTask() {
LOGGER.info("Unregister all the validation metrics.");
_validationMetrics.unregisterAllMetrics();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index a43d63b..6c0b59f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* Validates realtime ideal states and segment metadata, fixing any partitions
which have stopped consuming
*/
-public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<RealtimeSegmentValidationManager.Context> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
@@ -51,7 +51,6 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
private final int _segmentLevelValidationIntervalInSeconds;
private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
- private boolean _updateRealtimeDocumentCount;
public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics,
@@ -66,20 +65,21 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
}
@Override
- protected void preprocess() {
+ protected Context preprocess() {
+ Context context = new Context();
// Update realtime document counts only if certain time has passed after
previous run
- _updateRealtimeDocumentCount = false;
long currentTimeMs = System.currentTimeMillis();
if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastUpdateRealtimeDocumentCountTimeMs)
>= _segmentLevelValidationIntervalInSeconds) {
LOGGER.info("Run segment-level validation");
- _updateRealtimeDocumentCount = true;
+ context._updateRealtimeDocumentCount = true;
_lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
}
+ return context;
}
@Override
- protected void processTable(String tableNameWithType) {
+ protected void processTable(String tableNameWithType, Context context) {
CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
if (tableType == CommonConstants.Helix.TableType.REALTIME) {
@@ -89,7 +89,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
return;
}
- if (_updateRealtimeDocumentCount) {
+ if (context._updateRealtimeDocumentCount) {
updateRealtimeDocumentCount(tableConfig);
}
@@ -148,22 +148,12 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
}
@Override
- protected void postprocess() {
- }
-
- @Override
- protected void exceptionHandler(String tableNameWithType, Exception e) {
- LOGGER.error("Caught exception while validating realtime table: {}",
tableNameWithType, e);
- }
-
- @Override
- protected void initTask() {
-
- }
-
- @Override
- public void stopTask() {
+ public void cleanUpTask() {
LOGGER.info("Unregister all the validation metrics.");
_validationMetrics.unregisterAllMetrics();
}
+
+ public static final class Context {
+ private boolean _updateRealtimeDocumentCount;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 11b99ec..c235483 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -87,7 +87,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
@@ -149,7 +149,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -225,7 +225,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
@@ -267,7 +267,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS), 0);
@@ -294,7 +294,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
Long.MIN_VALUE);
@@ -352,7 +352,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
controllerMetrics.getValueOfTableGauge(externalView.getId(),
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -393,7 +393,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS), 1);
@@ -435,7 +435,7 @@ public class SegmentStatusCheckerTest {
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
0);
// update metrics
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
1);
}
@@ -469,7 +469,7 @@ public class SegmentStatusCheckerTest {
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
0);
// update metrics
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
1);
}
@@ -511,7 +511,7 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
- segmentStatusChecker.init();
+ segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
Long.MIN_VALUE);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index b3c974b..a928922 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -44,30 +44,30 @@ public class ControllerPeriodicTaskTest {
private final PinotHelixResourceManager _resourceManager =
mock(PinotHelixResourceManager.class);
private final ControllerMetrics _controllerMetrics = new
ControllerMetrics(new MetricsRegistry());
+ private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
- private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
- private final AtomicBoolean _processCalled = new AtomicBoolean();
+ private final AtomicBoolean _processTablesCalled = new AtomicBoolean();
private final AtomicInteger _tablesProcessed = new AtomicInteger();
private final int _numTables = 3;
private static final String TASK_NAME = "TestTask";
- private final MockControllerPeriodicTask _task = new
MockControllerPeriodicTask(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
+ private final ControllerPeriodicTask _task = new
ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
_controllerConf.getPeriodicTaskInitialDelayInSeconds(),
_resourceManager, _controllerMetrics) {
@Override
- protected void initTask() {
- _initTaskCalled.set(true);
+ protected void setUpTask() {
+ _startTaskCalled.set(true);
}
@Override
- public void stopTask() {
+ public void cleanUpTask() {
_stopTaskCalled.set(true);
}
@Override
- public void process(List<String> tableNamesWithType) {
- _processCalled.set(true);
- super.process(tableNamesWithType);
+ public void processTables(List<String> tableNamesWithType) {
+ _processTablesCalled.set(true);
+ super.processTables(tableNamesWithType);
}
@Override
@@ -84,9 +84,9 @@ public class ControllerPeriodicTaskTest {
}
private void resetState() {
- _initTaskCalled.set(false);
+ _startTaskCalled.set(false);
_stopTaskCalled.set(false);
- _processCalled.set(false);
+ _processTablesCalled.set(false);
_tablesProcessed.set(0);
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME, 0);
}
@@ -103,86 +103,72 @@ public class ControllerPeriodicTaskTest {
@Test
public void testControllerPeriodicTaskCalls() {
- // Initial state
+ // Start periodic task - leadership gained
resetState();
- _task.init();
- assertTrue(_initTaskCalled.get());
- assertFalse(_processCalled.get());
+ _task.start();
+ assertTrue(_startTaskCalled.get());
+ assertFalse(_processTablesCalled.get());
assertEquals(_tablesProcessed.get(), 0);
assertFalse(_stopTaskCalled.get());
- assertFalse(_task.shouldStopPeriodicTask());
+ assertTrue(_task.isStarted());
assertEquals(
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME), 0);
- // run task - leadership gained
+ // Run periodic task with leadership
resetState();
_task.run();
- assertFalse(_initTaskCalled.get());
- assertTrue(_processCalled.get());
+ assertFalse(_startTaskCalled.get());
+ assertTrue(_processTablesCalled.get());
assertEquals(_tablesProcessed.get(), _numTables);
assertEquals(
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME),
_numTables);
assertFalse(_stopTaskCalled.get());
- assertFalse(_task.shouldStopPeriodicTask());
+ assertTrue(_task.isStarted());
- // stop periodic task - leadership lost
+ // Stop periodic task - leadership lost
resetState();
_task.stop();
- assertFalse(_initTaskCalled.get());
- assertFalse(_processCalled.get());
+ assertFalse(_startTaskCalled.get());
+ assertFalse(_processTablesCalled.get());
assertEquals(_tablesProcessed.get(), 0);
assertEquals(
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME), 0);
assertTrue(_stopTaskCalled.get());
- assertTrue(_task.shouldStopPeriodicTask());
+ assertFalse(_task.isStarted());
- // call to run after periodic task stop invoked - leadership gained back
on same controller
+ // Run periodic task without leadership
resetState();
_task.run();
- assertFalse(_task.shouldStopPeriodicTask());
- assertFalse(_initTaskCalled.get());
- assertTrue(_processCalled.get());
+ assertFalse(_startTaskCalled.get());
+ assertFalse(_processTablesCalled.get());
+ assertEquals(_tablesProcessed.get(), 0);
+ assertEquals(
+
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME), 0);
+ assertFalse(_stopTaskCalled.get());
+ assertFalse(_task.isStarted());
+
+ // Restart periodic task - leadership re-gained
+ resetState();
+ _task.start();
+ assertTrue(_startTaskCalled.get());
+ assertFalse(_processTablesCalled.get());
+ assertEquals(_tablesProcessed.get(), 0);
+ assertFalse(_stopTaskCalled.get());
+ assertTrue(_task.isStarted());
+ assertEquals(
+
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME), 0);
+
+ // Run periodic task with leadership
+ resetState();
+ _task.run();
+ assertFalse(_startTaskCalled.get());
+ assertTrue(_processTablesCalled.get());
assertEquals(_tablesProcessed.get(), _numTables);
assertEquals(
_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
TASK_NAME),
_numTables);
assertFalse(_stopTaskCalled.get());
- }
-
- private class MockControllerPeriodicTask extends ControllerPeriodicTask {
-
- public MockControllerPeriodicTask(String taskName, long
runFrequencyInSeconds, long initialDelayInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics
controllerMetrics) {
- super(taskName, runFrequencyInSeconds, initialDelayInSeconds,
pinotHelixResourceManager, controllerMetrics);
- }
-
- @Override
- protected void initTask() {
-
- }
-
- @Override
- protected void preprocess() {
- }
-
- @Override
- protected void processTable(String tableNameWithType) {
-
- }
-
- @Override
- public void postprocess() {
- }
-
- @Override
- public void exceptionHandler(String tableNameWithType, Exception e) {
-
- }
-
- @Override
- public void stopTask() {
-
- }
+ assertTrue(_task.isStarted());
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 4fc8eca..4f34525 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -92,7 +92,7 @@ public class RetentionManagerTest {
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
- retentionManager.init();
+ retentionManager.start();
retentionManager.run();
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
@@ -215,7 +215,7 @@ public class RetentionManagerTest {
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
- retentionManager.init();
+ retentionManager.start();
retentionManager.run();
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index f3561bd..eb15aa9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -18,14 +18,28 @@
*/
package org.apache.pinot.core.periodictask;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
/**
* A base class to implement periodic task interface.
*/
+@ThreadSafe
public abstract class BasePeriodicTask implements PeriodicTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BasePeriodicTask.class);
+
+ // Wait for at most 30 seconds while calling stop() for task to terminate
+ private static final long MAX_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
+
protected final String _taskName;
protected final long _intervalInSeconds;
protected final long _initialDelayInSeconds;
+ private volatile boolean _started;
+ private volatile boolean _running;
+
public BasePeriodicTask(String taskName, long runFrequencyInSeconds, long
initialDelayInSeconds) {
_taskName = taskName;
_intervalInSeconds = runFrequencyInSeconds;
@@ -33,6 +47,11 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
}
@Override
+ public String getTaskName() {
+ return _taskName;
+ }
+
+ @Override
public long getIntervalInSeconds() {
return _intervalInSeconds;
}
@@ -42,9 +61,130 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
return _initialDelayInSeconds;
}
+ /**
+ * Returns the status of the {@code started} flag. This flag will be set
after calling {@link #start()}, and reset
+ * after calling {@link #stop()}.
+ */
+ public final boolean isStarted() {
+ return _started;
+ }
+
+ /**
+ * Returns the status of the {@code running} flag. This flag will be set
during the task execution.
+ */
+ public final boolean isRunning() {
+ return _running;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method sets {@code started} flag to true.
+ */
@Override
- public String getTaskName() {
- return _taskName;
+ public final synchronized void start() {
+ if (_started) {
+ LOGGER.warn("Task: {} is already started", _taskName);
+ return;
+ }
+ _started = true;
+
+ try {
+ setUpTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
+ }
+ }
+
+ /**
+ * Can be overridden for extra task setups. This method will be called when
the periodic task starts.
+ * <p>
+ * Possible setups include adding or resetting the metric values.
+ */
+ protected void setUpTask() {
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * During the task execution, the {@code running} flag will be set.
+ */
+ @Override
+ public final void run() {
+ _running = true;
+
+ if (_started) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("Start running task: {}", _taskName);
+ try {
+ runTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while running task: {}", _taskName, e);
+ }
+ LOGGER.info("Finish running task: {} in {}ms", _taskName,
System.currentTimeMillis() - startTime);
+ } else {
+ LOGGER.warn("Task: {} is skipped because it is not started or already
stopped", _taskName);
+ }
+
+ _running = false;
+ }
+
+ /**
+ * Executes the task. This method should early terminate if {@code started}
flag is set to false by {@link #stop()}
+ * during execution.
+ */
+ protected abstract void runTask();
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method sets {@code started} flag to false. If the task is running,
this method will block for at most 30
+ * seconds until the task finishes.
+ */
+ @Override
+ public final synchronized void stop() {
+ if (!_started) {
+ LOGGER.warn("Task: {} is not started", _taskName);
+ return;
+ }
+ _started = false;
+
+ if (_running) {
+ long startTimeMs = System.currentTimeMillis();
+ long remainingTimeMs = MAX_PERIODIC_TASK_STOP_TIME_MILLIS;
+ LOGGER.info("Task: {} is running, wait for at most {}ms for it to
finish", _taskName, remainingTimeMs);
+ while (_running && remainingTimeMs > 0L) {
+ long sleepTimeMs = Long.min(remainingTimeMs, 1000L);
+ remainingTimeMs -= sleepTimeMs;
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.error("Caught InterruptedException while waiting for task: {}
to finish", _taskName);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ long waitTimeMs = System.currentTimeMillis() - startTimeMs;
+ if (_running) {
+ LOGGER.warn("Task: {} is not finished in {}ms", waitTimeMs);
+ } else {
+ LOGGER.info("Task: {} is finished in {}ms", waitTimeMs);
+ }
+ }
+
+ try {
+ cleanUpTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while cleaning up task: {}", _taskName,
e);
+ }
+ }
+
+ /**
+ * Can be overridden for extra task cleanups. This method will be called
when the periodic task stops.
+ * <p>
+ * Possible cleanups include removing or resetting the metric values.
+ */
+ protected void cleanUpTask() {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
index da2952d..ee6c68b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTask.java
@@ -18,37 +18,50 @@
*/
package org.apache.pinot.core.periodictask;
+import javax.annotation.concurrent.ThreadSafe;
+
+
/**
* An interface to describe the functionality of periodic task. Periodic tasks
will be added to a list, scheduled
* and run in the periodic task scheduler with the fixed interval time.
*/
+@ThreadSafe
public interface PeriodicTask extends Runnable {
/**
- * Initialize the task before running the task.
+ * Returns the periodic task name.
+ * @return task name.
*/
- void init();
+ String getTaskName();
/**
- * Get the interval time of running the same task.
+ * Returns the interval time of running the same task.
* @return the interval time in seconds.
*/
long getIntervalInSeconds();
/**
- * Get the initial delay of the fist run.
+ * Returns the initial delay of the fist run.
* @return initial delay in seconds.
*/
long getInitialDelayInSeconds();
/**
- * Get the periodic task name.
- * @return task name.
+ * Performs necessary setups and starts the periodic task. Should be called
before scheduling the periodic task. Can
+ * be called after calling {@link #stop()} to restart the periodic task.
*/
- String getTaskName();
+ void start();
+
+ /**
+ * Executes the task. This method should be called only after {@link
#start()} getting called but before
+ * {@link #stop()} getting called.
+ */
+ @Override
+ void run();
/**
- * Stop the periodic task
+ * Stops the periodic task and performs necessary cleanups. Should be called
after removing the periodic task from the
+ * scheduler. Should be called after {@link #start()} getting called.
*/
void stop();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
index 054f1f8..f868ecf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/PeriodicTaskScheduler.java
@@ -37,8 +37,7 @@ public class PeriodicTaskScheduler {
private List<PeriodicTask> _tasksWithValidInterval;
/**
- * Initialize the PeriodicTaskScheduler with list of PeriodicTasks
- * @param periodicTasks
+ * Initializes the periodic task scheduler with a list of periodic tasks.
*/
public void init(List<PeriodicTask> periodicTasks) {
_tasksWithValidInterval = new ArrayList<>();
@@ -46,7 +45,6 @@ public class PeriodicTaskScheduler {
if (periodicTask.getIntervalInSeconds() > 0) {
LOGGER.info("Adding periodic task: {}", periodicTask);
_tasksWithValidInterval.add(periodicTask);
- periodicTask.init();
} else {
LOGGER.info("Skipping periodic task: {}", periodicTask);
}
@@ -54,7 +52,7 @@ public class PeriodicTaskScheduler {
}
/**
- * Start scheduling periodic tasks.
+ * Starts scheduling periodic tasks.
*/
public synchronized void start() {
if (_executorService != null) {
@@ -67,6 +65,7 @@ public class PeriodicTaskScheduler {
LOGGER.info("Starting periodic task scheduler with tasks: {}",
_tasksWithValidInterval);
_executorService =
Executors.newScheduledThreadPool(_tasksWithValidInterval.size());
for (PeriodicTask periodicTask : _tasksWithValidInterval) {
+ periodicTask.start();
_executorService.scheduleWithFixedDelay(() -> {
try {
LOGGER.info("Starting {} with running frequency of {} seconds.",
periodicTask.getTaskName(),
@@ -83,7 +82,7 @@ public class PeriodicTaskScheduler {
}
/**
- * Shutdown executor service and stop the periodic tasks
+ * Shuts down the executor service and stops the periodic tasks.
*/
public synchronized void stop() {
if (_executorService != null) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
index acacf15..a4d581d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/periodictask/PeriodicTaskSchedulerTest.java
@@ -34,24 +34,24 @@ public class PeriodicTaskSchedulerTest {
@Test
public void testTaskWithInvalidInterval()
throws Exception {
- AtomicBoolean initCalled = new AtomicBoolean();
+ AtomicBoolean startCalled = new AtomicBoolean();
AtomicBoolean runCalled = new AtomicBoolean();
AtomicBoolean stopCalled = new AtomicBoolean();
List<PeriodicTask> periodicTasks = Collections.singletonList(new
BasePeriodicTask("TestTask", 0L/*Invalid*/, 0L) {
@Override
- public void init() {
- initCalled.set(true);
+ protected void setUpTask() {
+ startCalled.set(true);
}
@Override
- public void stop() {
- stopCalled.set(true);
+ protected void runTask() {
+ runCalled.set(true);
}
@Override
- public void run() {
- runCalled.set(true);
+ protected void cleanUpTask() {
+ stopCalled.set(true);
}
});
@@ -61,7 +61,7 @@ public class PeriodicTaskSchedulerTest {
Thread.sleep(100L);
taskScheduler.stop();
- assertFalse(initCalled.get());
+ assertFalse(startCalled.get());
assertFalse(runCalled.get());
assertFalse(stopCalled.get());
}
@@ -70,26 +70,26 @@ public class PeriodicTaskSchedulerTest {
public void testScheduleMultipleTasks()
throws Exception {
int numTasks = 3;
- AtomicInteger numTimesInitCalled = new AtomicInteger();
+ AtomicInteger numTimesStartCalled = new AtomicInteger();
AtomicInteger numTimesRunCalled = new AtomicInteger();
AtomicInteger numTimesStopCalled = new AtomicInteger();
List<PeriodicTask> periodicTasks = new ArrayList<>(numTasks);
for (int i = 0; i < numTasks; i++) {
- periodicTasks.add(new BasePeriodicTask("Task", 1L, 0L) {
+ periodicTasks.add(new BasePeriodicTask("TestTask", 1L, 0L) {
@Override
- public void init() {
- numTimesInitCalled.getAndIncrement();
+ protected void setUpTask() {
+ numTimesStartCalled.getAndIncrement();
}
@Override
- public void stop() {
- numTimesStopCalled.getAndIncrement();
+ protected void runTask() {
+ numTimesRunCalled.getAndIncrement();
}
@Override
- public void run() {
- numTimesRunCalled.getAndIncrement();
+ protected void cleanUpTask() {
+ numTimesStopCalled.getAndIncrement();
}
});
}
@@ -100,7 +100,7 @@ public class PeriodicTaskSchedulerTest {
Thread.sleep(1100L);
taskScheduler.stop();
- assertEquals(numTimesInitCalled.get(), numTasks);
+ assertEquals(numTimesStartCalled.get(), numTasks);
assertEquals(numTimesRunCalled.get(), numTasks * 2);
assertEquals(numTimesStopCalled.get(), numTasks);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 58faf3a..7e7b041 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -160,7 +160,7 @@ public class SegmentCompletionIntegrationTests extends
LLCRealtimeClusterIntegra
// Now call the validation manager, and the segment should fix itself
RealtimeSegmentValidationManager validationManager =
_controllerStarter.getRealtimeSegmentValidationManager();
- validationManager.init();
+ validationManager.start();
validationManager.run();
// Check if a new segment get into CONSUMING state
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]