This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1fc25 Refactor ControllerPeriodicTask to iterate over tables
(#3618)
7a1fc25 is described below
commit 7a1fc25a80ec60e7898d2069371ce995132d5a11
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Dec 18 16:01:01 2018 -0800
Refactor ControllerPeriodicTask to iterate over tables (#3618)
Refactoring the tasks to pull up the iteration over tables into
ControllerPeriodicTask. This way we can have the checks just in the
ControllerPeriodicTask, and each PeriodicTask is only responsible for executing
the task on a given table.
---
.../controller/helix/SegmentStatusChecker.java | 285 +++++++++++----------
.../helix/core/minion/PinotTaskManager.java | 99 +++----
.../core/periodictask/ControllerPeriodicTask.java | 24 +-
.../core/relocation/RealtimeSegmentRelocator.java | 76 +++---
.../helix/core/retention/RetentionManager.java | 27 +-
.../controller/validation/ValidationManager.java | 89 +++----
.../periodictask/ControllerPeriodicTaskTest.java | 17 +-
7 files changed, 338 insertions(+), 279 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 549dc9d..8059d9e 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -47,13 +47,19 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";
private final ControllerMetrics _metricsRegistry;
- private final ControllerConf _config;
private final HelixAdmin _helixAdmin;
+ private final String _helixClusterName;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final int _waitForPushTimeSeconds;
// log messages about disabled tables atmost once a day
private static final long DISABLED_TABLE_LOG_INTERVAL_MS =
TimeUnit.DAYS.toMillis(1);
private long _lastDisabledTableLogTimestamp = 0;
+ private boolean _logDisabledTables;
+ private int _realTimeTableCount;
+ private int _offlineTableCount;
+ private int _disabledTableCount;
+
/**
* Constructs the segment status checker.
@@ -64,7 +70,9 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
ControllerMetrics metricsRegistry) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
pinotHelixResourceManager);
_helixAdmin = pinotHelixResourceManager.getHelixAdmin();
- _config = config;
+ _helixClusterName = pinotHelixResourceManager.getHelixClusterName();
+ _propertyStore = _pinotHelixResourceManager.getPropertyStore();
+
_waitForPushTimeSeconds =
config.getStatusCheckerWaitForPushTimeInSeconds();
_metricsRegistry = metricsRegistry;
}
@@ -82,166 +90,167 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- updateSegmentMetrics(tables);
- }
-
- /**
- * Runs a segment status pass over the given tables.
- * TODO: revisit the logic and reduce the ZK access
- *
- * @param tables List of table names
- */
- private void updateSegmentMetrics(List<String> tables) {
- // Fetch the list of tables
- String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
- HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
- int realTimeTableCount = 0;
- int offlineTableCount = 0;
- int disabledTableCount = 0;
- ZkHelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ protected void preprocess() {
+ _realTimeTableCount = 0;
+ _offlineTableCount = 0;
+ _disabledTableCount = 0;
// check if we need to log disabled tables log messages
- boolean logDisabledTables = false;
long now = System.currentTimeMillis();
if (now - _lastDisabledTableLogTimestamp >=
DISABLED_TABLE_LOG_INTERVAL_MS) {
- logDisabledTables = true;
+ _logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
} else {
- logDisabledTables = false;
+ _logDisabledTables = false;
}
+ }
- for (String tableName : tables) {
- try {
- if (TableNameBuilder.getTableTypeFromTableName(tableName) ==
TableType.OFFLINE) {
- offlineTableCount++;
- } else {
- realTimeTableCount++;
- }
- IdealState idealState =
helixAdmin.getResourceIdealState(helixClusterName, tableName);
- if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
- int nReplicasFromIdealState = 1;
- try {
- if (idealState != null) {
- nReplicasFromIdealState =
Integer.valueOf(idealState.getReplicas());
- }
- } catch (NumberFormatException e) {
- // Ignore
+ @Override
+ protected void processTable(String tableNameWithType) {
+ updateSegmentMetrics(tableNameWithType);
+ }
+
+ @Override
+ protected void postprocess() {
+
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT,
_realTimeTableCount);
+
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT,
_offlineTableCount);
+
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT,
_disabledTableCount);
+ }
+
+ /**
+ * Runs a segment status pass over the given table.
+ * TODO: revisit the logic and reduce the ZK access
+ *
+ * @param tableNameWithType
+ */
+ private void updateSegmentMetrics(String tableNameWithType) {
+
+ try {
+ if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE) {
+ _offlineTableCount++;
+ } else {
+ _realTimeTableCount++;
+ }
+ IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+ if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
+ int nReplicasFromIdealState = 1;
+ try {
+ if (idealState != null) {
+ nReplicasFromIdealState =
Integer.valueOf(idealState.getReplicas());
}
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS, 100);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
- continue;
+ } catch (NumberFormatException e) {
+ // Ignore
}
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ return;
+ }
- if (!idealState.isEnabled()) {
- if (logDisabledTables) {
- LOGGER.warn("Table {} is disabled. Skipping segment status
checks", tableName);
- }
- resetTableMetrics(tableName);
- disabledTableCount++;
- continue;
+ if (!idealState.isEnabled()) {
+ if (_logDisabledTables) {
+ LOGGER.warn("Table {} is disabled. Skipping segment status checks",
tableNameWithType);
}
+ resetTableMetrics(tableNameWithType);
+ _disabledTableCount++;
+ return;
+ }
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
- idealState.toString().length());
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENT_COUNT,
- (long) (idealState.getPartitionSet().size()));
- ExternalView externalView =
helixAdmin.getResourceExternalView(helixClusterName, tableName);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+ idealState.toString().length());
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT,
+ (long) (idealState.getPartitionSet().size()));
+ ExternalView externalView =
_helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType);
- int nReplicasIdealMax = 0; // Keeps track of maximum number of
replicas in ideal state
- int nReplicasExternal = -1; // Keeps track of minimum number of
replicas in external view
- int nErrors = 0; // Keeps track of number of segments in error state
- int nOffline = 0; // Keeps track of number segments with no online
replicas
- int nSegments = 0; // Counts number of segments
- for (String partitionName : idealState.getPartitionSet()) {
- int nReplicas = 0;
- int nIdeal = 0;
- nSegments++;
- // Skip segments not online in ideal state
- for (Map.Entry<String, String> serverAndState :
idealState.getInstanceStateMap(partitionName).entrySet()) {
- if (serverAndState == null) {
- break;
- }
- if (serverAndState.getValue().equals(ONLINE)) {
- nIdeal++;
- break;
- }
- }
- if (nIdeal == 0) {
- // No online segments in ideal state
- continue;
+ int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas
in ideal state
+ int nReplicasExternal = -1; // Keeps track of minimum number of replicas
in external view
+ int nErrors = 0; // Keeps track of number of segments in error state
+ int nOffline = 0; // Keeps track of number segments with no online
replicas
+ int nSegments = 0; // Counts number of segments
+ for (String partitionName : idealState.getPartitionSet()) {
+ int nReplicas = 0;
+ int nIdeal = 0;
+ nSegments++;
+ // Skip segments not online in ideal state
+ for (Map.Entry<String, String> serverAndState :
idealState.getInstanceStateMap(partitionName).entrySet()) {
+ if (serverAndState == null) {
+ break;
}
- nReplicasIdealMax =
(idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax)
- ? idealState.getInstanceStateMap(partitionName).size() :
nReplicasIdealMax;
- if ((externalView == null) ||
(externalView.getStateMap(partitionName) == null)) {
- // No replicas for this segment
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
- OfflineSegmentZKMetadata segmentZKMetadata =
-
ZKMetadataProvider.getOfflineSegmentZKMetadata(propertyStore, tableName,
partitionName);
- if (segmentZKMetadata != null
- && segmentZKMetadata.getPushTime() >
System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
- // push not yet finished, skip
- continue;
- }
- }
- nOffline++;
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no replicas",
partitionName, tableName);
- }
- nReplicasExternal = 0;
- continue;
+ if (serverAndState.getValue().equals(ONLINE)) {
+ nIdeal++;
+ break;
}
- for (Map.Entry<String, String> serverAndState :
externalView.getStateMap(partitionName).entrySet()) {
- // Count number of online replicas. Ignore if state is CONSUMING.
- // It is possible for a segment to be ONLINE in idealstate, and
CONSUMING in EV for a short period of time.
- // So, ignore this combination. If a segment exists in this
combination for a long time, we will get
- // low level-partition-not-consuming alert anyway.
- if (serverAndState.getValue().equals(ONLINE) ||
serverAndState.getValue().equals(CONSUMING)) {
- nReplicas++;
- }
- if (serverAndState.getValue().equals(ERROR)) {
- nErrors++;
+ }
+ if (nIdeal == 0) {
+ // No online segments in ideal state
+ continue;
+ }
+ nReplicasIdealMax =
+ (idealState.getInstanceStateMap(partitionName).size() >
nReplicasIdealMax) ? idealState.getInstanceStateMap(
+ partitionName).size() : nReplicasIdealMax;
+ if ((externalView == null) || (externalView.getStateMap(partitionName)
== null)) {
+ // No replicas for this segment
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+ OfflineSegmentZKMetadata segmentZKMetadata =
+ ZKMetadataProvider.getOfflineSegmentZKMetadata(_propertyStore,
tableNameWithType, partitionName);
+ if (segmentZKMetadata != null
+ && segmentZKMetadata.getPushTime() >
System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+ // push not yet finished, skip
+ continue;
}
}
- if (nReplicas == 0) {
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no online replicas",
partitionName, tableName);
- }
- nOffline++;
+ nOffline++;
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no replicas",
partitionName, tableNameWithType);
}
- nReplicasExternal =
- ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ?
nReplicas : nReplicasExternal;
- }
- if (nReplicasExternal == -1) {
- nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ nReplicasExternal = 0;
+ continue;
}
- // Synchronization provided by Controller Gauge to make sure that only
one thread updates the gauge
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS,
- (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 /
nReplicasIdealMax) : 100);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
- _metricsRegistry.setValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
- (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
- if (nOffline > 0) {
- LOGGER.warn("Table {} has {} segments with no online replicas",
tableName, nOffline);
+ for (Map.Entry<String, String> serverAndState :
externalView.getStateMap(partitionName).entrySet()) {
+ // Count number of online replicas. Ignore if state is CONSUMING.
+ // It is possible for a segment to be ONLINE in idealstate, and
CONSUMING in EV for a short period of time.
+ // So, ignore this combination. If a segment exists in this
combination for a long time, we will get
+ // low level-partition-not-consuming alert anyway.
+ if (serverAndState.getValue().equals(ONLINE) ||
serverAndState.getValue().equals(CONSUMING)) {
+ nReplicas++;
+ }
+ if (serverAndState.getValue().equals(ERROR)) {
+ nErrors++;
+ }
}
- if (nReplicasExternal < nReplicasIdealMax) {
- LOGGER.warn("Table {} has {} replicas, below replication threshold
:{}", tableName, nReplicasExternal,
- nReplicasIdealMax);
+ if (nReplicas == 0) {
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no online replicas",
partitionName, tableNameWithType);
+ }
+ nOffline++;
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while updating segment status for table
{}", e, tableName);
-
- // Remove the metric for this table
- resetTableMetrics(tableName);
+ nReplicasExternal =
+ ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ?
nReplicas : nReplicasExternal;
}
- }
+ if (nReplicasExternal == -1) {
+ nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ }
+ // Synchronization provided by Controller Gauge to make sure that only
one thread updates the gauge
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS,
+ (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 /
nReplicasIdealMax) : 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+ (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+ if (nOffline > 0) {
+ LOGGER.warn("Table {} has {} segments with no online replicas",
tableNameWithType, nOffline);
+ }
+ if (nReplicasExternal < nReplicasIdealMax) {
+ LOGGER.warn("Table {} has {} replicas, below replication threshold
:{}", tableNameWithType, nReplicasExternal,
+ nReplicasIdealMax);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while updating segment status for table
{}", e, tableNameWithType);
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT,
realTimeTableCount);
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT,
offlineTableCount);
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT,
disabledTableCount);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
+ }
}
private void setStatusToDefault() {
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0ff695d..729c75c 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -49,6 +49,11 @@ public class PinotTaskManager extends ControllerPeriodicTask
{
private final TaskGeneratorRegistry _taskGeneratorRegistry;
private final ControllerMetrics _controllerMetrics;
+ private Map<String, List<TableConfig>> _enabledTableConfigMap;
+ private Set<String> _taskTypes;
+ private int _numTaskTypes;
+ private Map<String, String> _tasksScheduled;
+
public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager
helixTaskResourceManager,
@Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull
ControllerConf controllerConf,
@Nonnull ControllerMetrics controllerMetrics) {
@@ -81,81 +86,81 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
}
/**
- * Check the Pinot cluster status and schedule new tasks for the given
tables.
- *
- * @param tables List of table names
- * @return Map from task type to task scheduled
+ * Public API to schedule tasks. It doesn't matter whether current pinot
controller is leader.
*/
- @Nonnull
- private Map<String, String> scheduleTasks(List<String> tables) {
+ public Map<String, String> scheduleTasks() {
+ process(_pinotHelixResourceManager.getAllTables());
+ return getTasksScheduled();
+ }
+
+ /**
+ * Performs necessary cleanups (e.g. remove metrics) when the controller
leadership changes.
+ */
+ @Override
+ public void onBecomeNotLeader() {
+ LOGGER.info("Perform task cleanups.");
+ // Performs necessary cleanups for each task type.
+ for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+ _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+ }
+ }
+
+
+ @Override
+ protected void preprocess() {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
- Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
- int numTaskTypes = taskTypes.size();
- Map<String, List<TableConfig>> enabledTableConfigMap = new
HashMap<>(numTaskTypes);
+ _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+ _numTaskTypes = _taskTypes.size();
+ _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
- for (String taskType : taskTypes) {
- enabledTableConfigMap.put(taskType, new ArrayList<>());
+ for (String taskType : _taskTypes) {
+ _enabledTableConfigMap.put(taskType, new ArrayList<>());
// Ensure all task queues exist
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
}
+ }
- // Scan all table configs to get the tables with tasks enabled
- for (String tableName : tables) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableName);
- if (tableConfig != null) {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig != null) {
- for (String taskType : taskTypes) {
- if (taskConfig.isTaskTypeEnabled(taskType)) {
- enabledTableConfigMap.get(taskType).add(tableConfig);
- }
+ @Override
+ protected void processTable(String tableNameWithType) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null) {
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig != null) {
+ for (String taskType : _taskTypes) {
+ if (taskConfig.isTaskTypeEnabled(taskType)) {
+ _enabledTableConfigMap.get(taskType).add(tableConfig);
}
}
}
}
+ }
+ @Override
+ protected void postprocess() {
// Generate each type of tasks
- Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
- for (String taskType : taskTypes) {
+ _tasksScheduled = new HashMap<>(_numTaskTypes);
+ for (String taskType : _taskTypes) {
LOGGER.info("Generating tasks for task type: {}", taskType);
PinotTaskGenerator pinotTaskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
+ List<PinotTaskConfig> pinotTaskConfigs =
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
int numTasks = pinotTaskConfigs.size();
if (numTasks > 0) {
LOGGER.info("Submitting {} tasks for task type: {} with task configs:
{}", numTasks, taskType,
pinotTaskConfigs);
- tasksScheduled.put(taskType,
_helixTaskResourceManager.submitTask(pinotTaskConfigs,
+ _tasksScheduled.put(taskType,
_helixTaskResourceManager.submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
_controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
-
- return tasksScheduled;
}
/**
- * Public API to schedule tasks. It doesn't matter whether current pinot
controller is leader.
- */
- public Map<String, String> scheduleTasks() {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables());
- }
-
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller
leadership changes.
+ * Returns the tasks that have been scheduled as part of the postprocess
+ * @return
*/
- @Override
- public void onBecomeNotLeader() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
- for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
- _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
- }
- }
-
- @Override
- public void process(List<String> tables) {
- scheduleTasks(tables);
+ private Map<String, String> getTasksScheduled() {
+ return _tasksScheduled;
}
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3b12c0f..c416d0e 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -110,7 +110,29 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
*
* @param tables List of table names
*/
- public abstract void process(List<String> tables);
+ protected void process(List<String> tables) {
+ preprocess();
+ for (String table : tables) {
+ processTable(table);
+ }
+ postprocess();
+ }
+
+ /**
+ * This method runs before processing all tables
+ */
+ protected abstract void preprocess();
+
+ /**
+ * Execute the controller periodic task for the given table
+ * @param tableNameWithType
+ */
+ protected abstract void processTable(String tableNameWithType);
+
+ /**
+ * This method runs after processing all tables
+ */
+ protected abstract void postprocess();
@VisibleForTesting
protected boolean isLeader() {
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index d152887..e324c11 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,51 +58,59 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- runRelocation(tables);
+ protected void preprocess() {
+
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ runRelocation(tableNameWithType);
+ }
+
+ @Override
+ protected void postprocess() {
+
}
/**
- * Check the given tables. Perform relocation of segments if table is
realtime and relocation is required
+ * Check the given table. Perform relocation of segments if table is
realtime and relocation is required
* TODO: Model this to implement {@link
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy}
interface
* https://github.com/linkedin/pinot/issues/2609
*
- * @param tables List of table names
+ * @param tableNameWithType
*/
- private void runRelocation(List<String> tables) {
- for (String tableNameWithType : tables) {
- // Only consider realtime tables.
- if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
- continue;
+ private void runRelocation(String tableNameWithType) {
+ // Only consider realtime tables.
+ if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+ return;
+ }
+ try {
+ LOGGER.info("Starting relocation of segments for table: {}",
tableNameWithType);
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+ final RealtimeTagConfig realtimeTagConfig = new
RealtimeTagConfig(tableConfig);
+ if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+ LOGGER.info("Skipping relocation of segments for {}",
tableNameWithType);
+ return;
}
- try {
- LOGGER.info("Starting relocation of segments for table: {}",
tableNameWithType);
-
- TableConfig tableConfig =
_pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
- final RealtimeTagConfig realtimeTagConfig = new
RealtimeTagConfig(tableConfig);
- if (!realtimeTagConfig.isRelocateCompletedSegments()) {
- LOGGER.info("Skipping relocation of segments for {}",
tableNameWithType);
- continue;
- }
- Function<IdealState, IdealState> updater = new Function<IdealState,
IdealState>() {
- @Nullable
- @Override
- public IdealState apply(@Nullable IdealState idealState) {
- if (!idealState.isEnabled()) {
- LOGGER.info("Skipping relocation of segments for {} since ideal
state is disabled", tableNameWithType);
- return null;
- }
- relocateSegments(realtimeTagConfig, idealState);
- return idealState;
+ Function<IdealState, IdealState> updater = new Function<IdealState,
IdealState>() {
+ @Nullable
+ @Override
+ public IdealState apply(@Nullable IdealState idealState) {
+ if (!idealState.isEnabled()) {
+ LOGGER.info("Skipping relocation of segments for {} since ideal
state is disabled", tableNameWithType);
+ return null;
}
- };
+ relocateSegments(realtimeTagConfig, idealState);
+ return idealState;
+ }
+ };
-
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
tableNameWithType, updater,
- RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
- } catch (Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}",
tableNameWithType, e);
- }
+
HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(),
tableNameWithType, updater,
+ RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
+ } catch (Exception e) {
+ LOGGER.error("Exception in relocating realtime segments of table {}",
tableNameWithType, e);
}
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index d76cc3b..80894fc 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -62,29 +62,26 @@ public class RetentionManager extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- execute(tables);
+ protected void preprocess() {
+
}
- /**
- * Manages retention for the given tables.
- *
- * @param tables List of table names
- */
- private void execute(List<String> tables) {
+ @Override
+ protected void processTable(String tableNameWithType) {
try {
- for (String tableNameWithType : tables) {
- LOGGER.info("Start managing retention for table: {}",
tableNameWithType);
- manageRetentionForTable(tableNameWithType);
- }
-
- LOGGER.info("Removing aged (more than {} days) deleted segments for all
tables", _deletedSegmentsRetentionInDays);
-
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ manageRetentionForTable(tableNameWithType);
} catch (Exception e) {
LOGGER.error("Caught exception while managing retention for all tables",
e);
}
}
+ @Override
+ protected void postprocess() {
+ LOGGER.info("Removing aged (more than {} days) deleted segments for all
tables", _deletedSegmentsRetentionInDays);
+
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+ }
+
private void manageRetentionForTable(String tableNameWithType) {
try {
// Build retention strategy from table config
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 8717318..18025eb 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -23,7 +23,7 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ValidationMetrics;
-import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
+import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.HLCSegmentName;
import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.time.TimeUtils;
@@ -57,6 +57,8 @@ public class ValidationManager extends ControllerPeriodicTask
{
private final ValidationMetrics _validationMetrics;
private long _lastSegmentLevelValidationTimeMs = 0L;
+ private boolean _runSegmentLevelValidation;
+ private List<InstanceConfig> _instanceConfigs;
public ValidationManager(ControllerConf config, PinotHelixResourceManager
pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics) {
@@ -74,61 +76,62 @@ public class ValidationManager extends
ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- runValidation(tables);
- }
-
- /**
- * Runs a validation pass over the given tables.
- *
- * @param tables List of table names
- */
- private void runValidation(List<String> tables) {
+ protected void preprocess() {
// Run segment level validation using a separate interval
- boolean runSegmentLevelValidation = false;
+ _runSegmentLevelValidation = false;
long currentTimeMs = System.currentTimeMillis();
if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastSegmentLevelValidationTimeMs)
>= _segmentLevelValidationIntervalInSeconds) {
LOGGER.info("Run segment-level validation");
- runSegmentLevelValidation = true;
+ _runSegmentLevelValidation = true;
_lastSegmentLevelValidationTimeMs = currentTimeMs;
}
// Cache instance configs to reduce ZK access
- List<InstanceConfig> instanceConfigs =
_pinotHelixResourceManager.getAllHelixInstanceConfigs();
-
- for (String tableNameWithType : tables) {
- try {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- continue;
- }
+ _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ }
- // Rebuild broker resource
- Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
+ @Override
+ protected void processTable(String tableNameWithType) {
+ runValidation(tableNameWithType);
+ }
- // Perform validation based on the table type
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.OFFLINE) {
- if (runSegmentLevelValidation) {
- validateOfflineSegmentPush(tableConfig);
- }
- } else {
- if (runSegmentLevelValidation) {
- updateRealtimeDocumentCount(tableConfig);
- }
- Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
- }
+ @Override
+ protected void postprocess() {
+
+ }
+
+ private void runValidation(String tableNameWithType) {
+ try {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
+ }
+
+ // Rebuild broker resource
+ Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
+
+ // Perform validation based on the table type
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
+ if (_runSegmentLevelValidation) {
+ validateOfflineSegmentPush(tableConfig);
+ }
+ } else {
+ if (_runSegmentLevelValidation) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+ Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating table: {}",
tableNameWithType, e);
}
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating table: {}",
tableNameWithType, e);
}
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 9a5d26c..94c9f52 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -115,7 +115,22 @@ public class ControllerPeriodicTaskTest {
}
@Override
- public void process(List<String> tables) {
+ protected void process(List<String> tables) {
+
+ }
+
+ @Override
+ protected void preprocess() {
+
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+
+ }
+
+ @Override
+ public void postprocess() {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]