jineshparakh commented on code in PR #17741:
URL: https://github.com/apache/pinot/pull/17741#discussion_r2842086000


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java:
##########
@@ -120,6 +120,9 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Number of Tasks Status
   TASK_STATUS("taskStatus", false),
 
+  // Number of jobs in a task queue (per task type)
+  TASK_QUEUE_SIZE("taskQueueSize", false),

Review Comment:
   Done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -1061,6 +1115,129 @@ protected void nonLeaderCleanup(List<String> 
tableNamesWithType) {
     }
   }
 
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    // Parent implementation is a no-op; no need to call super.onChange().
+    // This needed to be revisited if parent behavior changes
+    if (clusterConfigs == null) {
+      return;
+    }
+
+    if (changedConfigs == null
+        || 
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS))
 {
+      String taskExpireStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+      if (taskExpireStr != null) {
+        try {
+          long parsed = Long.parseLong(taskExpireStr);
+          if (parsed <= 0) {
+            LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+                
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS);
+          } else {
+            _helixTaskResourceManager.setTaskExpireTimeMs(parsed);
+          }
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS, 
taskExpireStr);
+        }
+      }
+    }
+
+    if (changedConfigs == null
+        || changedConfigs.contains(
+            
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS))
 {
+      String terminalExpireStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS);
+      if (terminalExpireStr != null) {
+        try {
+          long parsed = Long.parseLong(terminalExpireStr);
+          if (parsed <= 0) {
+            LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+                
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS);
+          } else {
+            _helixTaskResourceManager.setTerminalStateExpireTimeMs(parsed);
+          }
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_TERMINAL_STATE_EXPIRE_TIME_MS,
 terminalExpireStr);
+        }
+      }
+    }
+
+    if (changedConfigs == null
+        || 
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE))
 {
+      String maxSizeStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE);
+      if (maxSizeStr != null) {
+        try {
+          _taskQueueMaxSize = Integer.parseInt(maxSizeStr);
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_SIZE, 
maxSizeStr);
+        }
+      }
+    }
+
+    if (changedConfigs == null
+        || changedConfigs.contains(
+            
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE))
 {
+      String maxDeletesStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+      if (maxDeletesStr != null) {
+        try {
+          int parsed = Integer.parseInt(maxDeletesStr);
+          if (parsed <= 0) {
+            LOGGER.warn("Ignoring non-positive value {} for {}", parsed,
+                
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+          } else if (parsed > MAX_DELETES_PER_CYCLE_CAP) {
+            LOGGER.warn("Clamping {} to cap {} for {}", parsed, 
MAX_DELETES_PER_CYCLE_CAP,
+                
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE);
+            _taskQueueMaxDeletesPerCycle = MAX_DELETES_PER_CYCLE_CAP;
+          } else {
+            _taskQueueMaxDeletesPerCycle = parsed;
+          }
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_MAX_DELETES_PER_CYCLE,
 maxDeletesStr);
+        }
+      }
+    }
+
+    if (changedConfigs == null
+        || 
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY))
 {
+      String capacityStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY);
+      if (capacityStr != null) {
+        try {
+          int parsed = Integer.parseInt(capacityStr);
+          if (parsed > 0 || parsed == -1) {
+            _helixTaskResourceManager.setQueueCapacity(parsed);
+          } else {
+            LOGGER.warn("Ignoring invalid capacity {} for {}. Must be -1 
(unlimited) or positive.",
+                parsed, 
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY);
+          }
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_CAPACITY, 
capacityStr);
+        }
+      }
+    }
+
+    if (changedConfigs == null
+        || 
changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD))
 {
+      String warningStr = clusterConfigs.get(
+          
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD);
+      if (warningStr != null) {
+        try {
+          _taskQueueWarningThreshold = Integer.parseInt(warningStr);
+        } catch (NumberFormatException e) {
+          LOGGER.warn("Invalid value for {}: {}",
+              
ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_QUEUE_WARNING_THRESHOLD, 
warningStr);
+        }
+      }
+    }
+  }

Review Comment:
   I refactored the method.
   With individual log lines, we have better visibility of the exact change 
from oldValue to newValue.
   We loose that with consolidated logging.
   
   I would like to keep individual logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to