shounakmk219 commented on code in PR #17741:
URL: https://github.com/apache/pinot/pull/17741#discussion_r2840302251
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -1061,6 +1115,129 @@ protected void nonLeaderCleanup(List<String>
tableNamesWithType) {
}
}
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ // Parent implementation is a no-op; no need to call super.onChange().
+ // This needed to be revisited if parent behavior changes
+ if (clusterConfigs == null) {
+ return;
+ }
+
+ if (changedConfigs == null
+ ||
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS))
{
+ String taskExpireStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+ if (taskExpireStr != null) {
+ try {
+ long parsed = Long.parseLong(taskExpireStr);
+ if (parsed <= 0) {
+ LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+ } else {
+ _helixTaskResourceManager.setTaskExpireTimeMs(parsed);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS,
taskExpireStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
Review Comment:
Can we pull out all the config assignments into separate methods and keep
onChange concise?
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java:
##########
@@ -120,6 +120,9 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// Number of Tasks Status
TASK_STATUS("taskStatus", false),
+ // Number of jobs in a task queue (per task type)
+ TASK_QUEUE_SIZE("taskQueueSize", false),
Review Comment:
nit : Can we rename this to something like `TASKS_TRACKED_FOR_TASK_TYPE`
instead of having "queue" in the name as its confusing for user and usually
gets interpreted as pending tasks in task queue
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -1061,6 +1115,129 @@ protected void nonLeaderCleanup(List<String>
tableNamesWithType) {
}
}
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ // Parent implementation is a no-op; no need to call super.onChange().
+ // This needed to be revisited if parent behavior changes
+ if (clusterConfigs == null) {
+ return;
+ }
+
+ if (changedConfigs == null
Review Comment:
what does `changedConfigs == null` signify here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -1061,6 +1115,129 @@ protected void nonLeaderCleanup(List<String>
tableNamesWithType) {
}
}
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ // Parent implementation is a no-op; no need to call super.onChange().
+ // This needed to be revisited if parent behavior changes
+ if (clusterConfigs == null) {
+ return;
+ }
+
+ if (changedConfigs == null
+ ||
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS))
{
+ String taskExpireStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+ if (taskExpireStr != null) {
+ try {
+ long parsed = Long.parseLong(taskExpireStr);
+ if (parsed <= 0) {
+ LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+ } else {
+ _helixTaskResourceManager.setTaskExpireTimeMs(parsed);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS,
taskExpireStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
+ || changedConfigs.contains(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS))
{
+ String terminalExpireStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS);
+ if (terminalExpireStr != null) {
+ try {
+ long parsed = Long.parseLong(terminalExpireStr);
+ if (parsed <= 0) {
+ LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS);
+ } else {
+ _helixTaskResourceManager.setTerminalStateExpireTimeMs(parsed);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS,
terminalExpireStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
+ ||
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE))
{
+ String maxSizeStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE);
+ if (maxSizeStr != null) {
+ try {
+ _taskQueueMaxSize = Integer.parseInt(maxSizeStr);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE,
maxSizeStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
+ || changedConfigs.contains(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE))
{
+ String maxDeletesStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+ if (maxDeletesStr != null) {
+ try {
+ int parsed = Integer.parseInt(maxDeletesStr);
+ if (parsed <= 0) {
+ LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+ } else if (parsed > MAX_DELETES_PER_CYCLE_CAP) {
+ LOGGER.warn("Clamping {} to cap {} for {}", parsed,
MAX_DELETES_PER_CYCLE_CAP,
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+ _taskQueueMaxDeletesPerCycle = MAX_DELETES_PER_CYCLE_CAP;
+ } else {
+ _taskQueueMaxDeletesPerCycle = parsed;
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE,
maxDeletesStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
+ ||
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY))
{
+ String capacityStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY);
+ if (capacityStr != null) {
+ try {
+ int parsed = Integer.parseInt(capacityStr);
+ if (parsed > 0 || parsed == -1) {
+ _helixTaskResourceManager.setQueueCapacity(parsed);
+ } else {
+ LOGGER.warn("Ignoring invalid capacity {} for {}. Must be -1
(unlimited) or positive.",
+ parsed,
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY,
capacityStr);
+ }
+ }
+ }
+
+ if (changedConfigs == null
+ ||
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD))
{
+ String warningStr = clusterConfigs.get(
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD);
+ if (warningStr != null) {
+ try {
+ _taskQueueWarningThreshold = Integer.parseInt(warningStr);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value for {}: {}",
+
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD,
warningStr);
+ }
+ }
+ }
+ }
Review Comment:
can we put a consolidated log line with all the set values for better
visibility?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]