vvivekiyer commented on code in PR #16672:
URL: https://github.com/apache/pinot/pull/16672#discussion_r2376478050


##########
pinot-common/src/main/java/org/apache/pinot/common/messages/QueryWorkloadRefreshMessage.java:
##########
@@ -45,7 +45,10 @@ public QueryWorkloadRefreshMessage(String queryWorkloadName, 
String messageSubTy
     setMsgSubType(messageSubType);
     // Give it infinite time to process the message, as long as session is 
alive
     setExecutionTimeout(-1);
-    QueryWorkloadConfigUtils.updateZNRecordWithInstanceCost(getRecord(), 
queryWorkloadName, instanceCost);
+    // We don't expect the instance cost for non-refresh message
+    if (messageSubType.equals(REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE)) {
+      QueryWorkloadConfigUtils.updateZNRecordWithInstanceCost(getRecord(), 
queryWorkloadName, instanceCost);

Review Comment:
   Curious: Did we find this during testing? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -211,24 +215,167 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
         if (enforcementProfile == null) {
           errors.add(prefix + "enforcementProfile cannot be null");
         } else {
-           if (enforcementProfile.getCpuCostNs() < 0) {
+          long enforcementCpu = enforcementProfile.getCpuCostNs();
+          long enforcementMem = enforcementProfile.getMemoryCostBytes();
+           if (enforcementCpu < 0) {

Review Comment:
   Should this be <= 0? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -101,61 +148,104 @@ private static void 
collectHelixTagsForTable(List<String> tags, TenantConfig ten
   }
 
   /**
-   * Get the helix tags for a given table name.
-   * If the table name does not have a type suffix, it will return both 
offline and realtime tags.
+   * Resolves Helix tags for a table.
+   *
+   * <p>
+   * If the input table name lacks a type suffix, both offline and realtime 
table names are expanded
+   * and resolved. Otherwise, the specific table name is used.
+   * </p>
+   *
+   * @param pinotResourceManager Resource manager to fetch table configs.
+   * @param tableName The raw or type-qualified table name.
+   * @return A list of Helix tags associated with the table.
    */
   public static List<String> getHelixTagsForTable(PinotHelixResourceManager 
pinotResourceManager, String tableName) {
+    if (tableName == null || tableName.trim().isEmpty()) {
+      throw new IllegalArgumentException("Table name cannot be null or empty");
+    }
+
     List<String> combinedTags = new ArrayList<>();
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     List<String> tablesWithType = (tableType == null)
         ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
             TableNameBuilder.REALTIME.tableNameWithType(tableName))
         : Collections.singletonList(tableName);
     for (String table : tablesWithType) {
-      TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
-      if (tableConfig != null) {
-        collectHelixTagsForTable(combinedTags, tableConfig.getTenantConfig(), 
tableConfig.getTableType());
+      try {
+        TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
+        if (tableConfig != null && tableConfig.getTenantConfig() != null && 
tableConfig.getTableType() != null) {
+          collectHelixTagsForTable(combinedTags, 
tableConfig.getTenantConfig(), tableConfig.getTableType());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to get table config for table: " + 
table, e);
       }
     }
     return combinedTags;
   }
 
   /**
-   * Get the mapping between helix tag -> instances
+   * Builds a mapping from Helix tag to the set of instances carrying that tag.
+   *
+   * @param pinotResourceManager Resource manager used to fetch instance 
configs.
+   * @return A mapping of Helix tag → set of instance names.
    */
   public static Map<String, Set<String>> 
getHelixTagToInstances(PinotHelixResourceManager pinotResourceManager) {
     Map<String, Set<String>> tagToInstances = new HashMap<>();
-    for (InstanceConfig instanceConfig : 
pinotResourceManager.getAllHelixInstanceConfigs()) {
-      String instanceName = instanceConfig.getInstanceName();
-      for (String helixTag : instanceConfig.getTags()) {
-        tagToInstances.computeIfAbsent(helixTag, tag -> new 
HashSet<>()).add(instanceName);
+    try {
+      List<InstanceConfig> instanceConfigs = 
pinotResourceManager.getAllHelixInstanceConfigs();
+      if (instanceConfigs == null) {
+        LOGGER.warn("No instance configs found, returning empty mapping");
+        return tagToInstances;
+      }
+
+      for (InstanceConfig instanceConfig : instanceConfigs) {
+        if (instanceConfig == null) {
+          LOGGER.warn("Skipping null instance config");
+          continue;
+        }
+        String instanceName = instanceConfig.getInstanceName();
+        List<String> tags = instanceConfig.getTags();
+        if (tags != null) {
+          for (String helixTag : tags) {
+            if (helixTag != null && !helixTag.trim().isEmpty()) {

Review Comment:
   Same here reg error handling. Can the helixTag ever be null? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -33,37 +33,72 @@
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationEntity;
 import org.apache.pinot.spi.config.workload.PropagationScheme;
 import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * This class provides utility methods for workload propagation.
+ * Provides utility methods for workload propagation in Pinot.
+ *
+ * <p>
+ * This class centralizes logic for resolving Helix tags, instance mappings, 
and matching query
+ * workload configs to their applicable propagation scope.
+ * </p>
  */
 public class PropagationUtils {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PropagationUtils.class);
   private PropagationUtils() {
   }
 
   /**
-   * Get the mapping tableNameWithType → {BROKER_NODE→brokerTag, 
SERVER_NODE→(serverTag + overrides)}
-   * 1. Get all table configs from the PinotHelixResourceManager
-   * 2. For each table config, extract the tenant config
-   * 3. For each tenant config, get the broker and server tags
-   * 4. Populate the helix tags for BROKER_NODE and SERVER_NODE separately
+   * Builds a mapping from table name with type to Helix tags per node type.
+   *
+   * <p>For each table:</p>
+   * <ol>
+   *   <li>Fetch the {@link TenantConfig}.</li>
+   *   <li>Derive broker and server tags from the tenant configuration.</li>
+   *   <li>Associate {@link NodeConfig.Type#BROKER_NODE} with broker tags.</li>
+   *   <li>Associate {@link NodeConfig.Type#SERVER_NODE} with server tags 
(consuming and/or completed
+   *       for realtime, or offline for batch).</li>
+   * </ol>
+   *
+   * @param pinotResourceManager Resource manager used to fetch table configs.
+   * @return A mapping of tableNameWithType to node type → Helix tags.
    */
   public static Map<String, Map<NodeConfig.Type, Set<String>>> 
getTableToHelixTags(
           PinotHelixResourceManager pinotResourceManager) {
     Map<String, Map<NodeConfig.Type, Set<String>>> tableToTags = new 
HashMap<>();
-    for (TableConfig tableConfig : pinotResourceManager.getAllTableConfigs()) {
+    List<TableConfig> tableConfigs = pinotResourceManager.getAllTableConfigs();
+    if (tableConfigs == null) {

Review Comment:
   getAllTableConfigs never returns a null. It returns an emptyList. 
   
   Can we check all such error handling places and clean up the code a little 
bit? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -211,24 +215,167 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
         if (enforcementProfile == null) {
           errors.add(prefix + "enforcementProfile cannot be null");
         } else {
-           if (enforcementProfile.getCpuCostNs() < 0) {
+          long enforcementCpu = enforcementProfile.getCpuCostNs();
+          long enforcementMem = enforcementProfile.getMemoryCostBytes();
+           if (enforcementCpu < 0) {
              errors.add(prefix + ".enforcementProfile.cpuCostNs cannot be 
negative");
            }
-           if (enforcementProfile.getMemoryCostBytes() < 0) {
-               errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot 
be negative");
+           if (enforcementMem < 0) {
+             errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot 
be negative");
            }
-        }
-        // Validate PropagationScheme
-        PropagationScheme propagationScheme = 
nodeConfig.getPropagationScheme();
-        if (propagationScheme == null) {
-          errors.add(prefix + ".propagationScheme cannot be null");
-        } else {
-          if (propagationScheme.getPropagationType() == null) {
-            errors.add(prefix + ".propagationScheme.type cannot be null");
+          // Validate PropagationScheme
+          PropagationScheme propagationScheme = 
nodeConfig.getPropagationScheme();
+          if (propagationScheme == null) {
+            errors.add(prefix + ".propagationScheme cannot be null");
+          } else {
+            PropagationScheme.Type propagationType = 
propagationScheme.getPropagationType();
+            if (propagationType == null) {
+              errors.add(prefix + ".propagationScheme.type cannot be null");

Review Comment:
   Why are we not continuing here to the next nodeConfig when we hit errors? Is 
the idea to collect all errrors and surface them in one-go? 
   
   The reason I ask is - If propagationType is null how do you even validate 
the propagationEntities meaningfully? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -211,24 +215,167 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
         if (enforcementProfile == null) {
           errors.add(prefix + "enforcementProfile cannot be null");
         } else {
-           if (enforcementProfile.getCpuCostNs() < 0) {
+          long enforcementCpu = enforcementProfile.getCpuCostNs();
+          long enforcementMem = enforcementProfile.getMemoryCostBytes();
+           if (enforcementCpu < 0) {
              errors.add(prefix + ".enforcementProfile.cpuCostNs cannot be 
negative");
            }
-           if (enforcementProfile.getMemoryCostBytes() < 0) {
-               errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot 
be negative");
+           if (enforcementMem < 0) {
+             errors.add(prefix + ".enforcementProfile.memoryCostBytes cannot 
be negative");
            }
-        }
-        // Validate PropagationScheme
-        PropagationScheme propagationScheme = 
nodeConfig.getPropagationScheme();
-        if (propagationScheme == null) {
-          errors.add(prefix + ".propagationScheme cannot be null");
-        } else {
-          if (propagationScheme.getPropagationType() == null) {
-            errors.add(prefix + ".propagationScheme.type cannot be null");
+          // Validate PropagationScheme
+          PropagationScheme propagationScheme = 
nodeConfig.getPropagationScheme();
+          if (propagationScheme == null) {
+            errors.add(prefix + ".propagationScheme cannot be null");
+          } else {
+            PropagationScheme.Type propagationType = 
propagationScheme.getPropagationType();
+            if (propagationType == null) {
+              errors.add(prefix + ".propagationScheme.type cannot be null");
+            }
+            // Validate PropagationEntities
+            validateEntityList(propagationScheme.getPropagationEntities(),
+                prefix + ".propagationScheme.propagationEntities", errors,
+                enforcementProfile.getCpuCostNs(), 
enforcementProfile.getMemoryCostBytes());
           }
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of PropagationEntity objects.
+   * <p>
+   * This method performs comprehensive validation including:
+   * <ul>
+   *   <li>Ensures the list is non-null and non-empty</li>
+   *   <li>Checks for duplicate propagationEntity IDs</li>
+   *   <li>Validates cpuCostNs and memoryCostBytes for non-null/non-negative 
values</li>
+   *   <li>Ensures consistency in cost definitions across all entities (either 
all or none define costs)</li>
+   *   <li>Validates that total costs do not exceed provided limits (if 
any)</li>
+   *   <li>Validates any overrides within each entity for the same cost 
rules</li>
+   *   <li>Rewrites empty costs to evenly distribute parent limits if all 
entities have empty costs</li>
+   * </ul>
+   *
+   */
+  private static void validateEntityList(List<PropagationEntity> entities, 
String prefix,
+                                         List<String> errors, Long limitCpu, 
Long limitMem) {
+    if (entities == null || entities.isEmpty()) {
+      errors.add(prefix + " cannot be null or empty");
+      return;
+    }
+    Set<String> seenIds = new HashSet<>();
+    // Accumulate total CPU/memory costs to ensure they don't exceed 
enforcementProfile limits
+    long totalCpu = 0;
+    long totalMem = 0;
+    // Track whether costs are defined or empty to ensure consistency across 
all entities
+    int definedCount = 0;
+    int emptyCount = 0;
+    for (int i = 0; i < entities.size(); i++) {
+      PropagationEntity entity = entities.get(i);
+      String entityPrefix = prefix + "[" + i + "]";
+      if (entity == null) {
+        errors.add(entityPrefix + " cannot be null");
+        continue;
+      }
+      validateDuplicateEntity(entity.getEntity(), entityPrefix, seenIds, 
errors);
+      Long currentCpu = entity.getCpuCostNs();
+      Long currentMem = entity.getMemoryCostBytes();
+      // Both costs must be defined or both null
+      // If both are defined, add to totalCpu and totalMem for limit validation
+      // If both are null, do nothing
+      if (currentCpu != null && currentMem != null) {
+        totalCpu += costOrZero(entityPrefix, "cpuCostNs", currentCpu, errors);
+        totalMem += costOrZero(entityPrefix, "memoryCostBytes", currentMem, 
errors);
+        definedCount++;
+      } else if (currentCpu == null && currentMem == null) {
+        emptyCount++;
+      } else {
+        errors.add(entityPrefix + " must have both cpuCostNs and 
memoryCostBytes defined or both null");
+        break;
+      }
+      if (definedCount > 0 && emptyCount > 0) {
+        errors.add(prefix + " must have either all or none of the 
propagationEntities define costs");
+        break;
+      }
+      List<PropagationEntityOverrides> overrides = entity.getOverrides();
+      if (overrides != null && !overrides.isEmpty()) {
+        validateOverrides(overrides, entityPrefix, errors, currentCpu, 
currentMem);
+      }
+    }
+    validateLimits(totalCpu, totalMem, limitCpu, limitMem, prefix, errors);
+    // If no errors and all entities have empty costs, rewrite to evenly 
distribute enforcementProfile costs
+    if (errors.isEmpty() && definedCount == 0 && emptyCount == 
entities.size()) {

Review Comment:
   errors will be non-empty is some entities have cost defined. So, can this 
check just be if(errors.isEmpty() && definedCount == 0)? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -33,37 +33,72 @@
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationEntity;
 import org.apache.pinot.spi.config.workload.PropagationScheme;
 import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * This class provides utility methods for workload propagation.
+ * Provides utility methods for workload propagation in Pinot.
+ *
+ * <p>
+ * This class centralizes logic for resolving Helix tags, instance mappings, 
and matching query
+ * workload configs to their applicable propagation scope.
+ * </p>
  */
 public class PropagationUtils {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PropagationUtils.class);
   private PropagationUtils() {
   }
 
   /**
-   * Get the mapping tableNameWithType → {BROKER_NODE→brokerTag, 
SERVER_NODE→(serverTag + overrides)}
-   * 1. Get all table configs from the PinotHelixResourceManager
-   * 2. For each table config, extract the tenant config
-   * 3. For each tenant config, get the broker and server tags
-   * 4. Populate the helix tags for BROKER_NODE and SERVER_NODE separately
+   * Builds a mapping from table name with type to Helix tags per node type.
+   *
+   * <p>For each table:</p>
+   * <ol>
+   *   <li>Fetch the {@link TenantConfig}.</li>
+   *   <li>Derive broker and server tags from the tenant configuration.</li>
+   *   <li>Associate {@link NodeConfig.Type#BROKER_NODE} with broker tags.</li>
+   *   <li>Associate {@link NodeConfig.Type#SERVER_NODE} with server tags 
(consuming and/or completed
+   *       for realtime, or offline for batch).</li>
+   * </ol>
+   *
+   * @param pinotResourceManager Resource manager used to fetch table configs.
+   * @return A mapping of tableNameWithType to node type → Helix tags.
    */
   public static Map<String, Map<NodeConfig.Type, Set<String>>> 
getTableToHelixTags(
           PinotHelixResourceManager pinotResourceManager) {
     Map<String, Map<NodeConfig.Type, Set<String>>> tableToTags = new 
HashMap<>();
-    for (TableConfig tableConfig : pinotResourceManager.getAllTableConfigs()) {
+    List<TableConfig> tableConfigs = pinotResourceManager.getAllTableConfigs();
+    if (tableConfigs == null) {
+      LOGGER.warn("No table configs found, returning empty mapping");
+      return tableToTags;
+    }
+    for (TableConfig tableConfig : tableConfigs) {
+      if (tableConfig == null) {

Review Comment:
   Same here. tableConfig can never be null. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,78 +82,186 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+            nodeConfig.getPropagationScheme().getPropagationType());
+        // For propagation entities with empty cpu or memory cost, distribute 
the remaining cost evenly among them
+        checkAndDistributeEmptyPropagationEntitiesEvenly(nodeConfig);
+        Map<String, InstanceCost> instanceCostMap = 
propagationScheme.resolveInstanceCostMap(nodeConfig, _costSplitter);
+        if (instanceCostMap.isEmpty()) {
+          // This is to ensure that the configured entity is valid and maps to 
some instances
+          String errorMsg = String.format("No instances found for workload 
update: %s with nodeConfig: %s",
+              queryWorkloadName, nodeConfig);
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
+        }
+
+        Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+                
QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
+        instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
       }
-      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
-      // Send the QueryWorkloadRefreshMessage to the instances
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      // Sends the message only after all nodeConfigs are processed 
successfully
+      sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      LOGGER.info("Successfully propagated workload update for: {} to {} 
instances", queryWorkloadName,
+          instanceToRefreshMessageMap.size());
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload update 
for: %s", queryWorkloadName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
     }
   }
 
   /**
-   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
-   * @param queryWorkloadConfig The query workload configuration to delete
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   * Propagates a delete for the given workload to all relevant instances.
+   *
+   * <p>
+   * The method resolves the target instances for each {@link NodeConfig} and 
sends a
+   * {@link QueryWorkloadRefreshMessage} with subtype
+   * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+   * which instructs the instance to remove local state associated with the 
workload and stop enforcing costs for it.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload to delete (only the name and node 
scoping are used).
    */
   public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
+    LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);
+
     for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
+      if (nodeConfig == null) {
+        LOGGER.warn("Skipping null NodeConfig for workload delete: {}", 
queryWorkloadName);
         continue;
       }
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
-          .collect(Collectors.toMap(instance -> instance, instance -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
-              QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
null)));
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      try {
+        Set<String> instances = resolveInstances(nodeConfig);
+        if (instances.isEmpty()) {
+          LOGGER.warn("No instances found for workload delete: {} with 
nodeConfig: {}", queryWorkloadName, nodeConfig);
+          continue;
+        }
+        QueryWorkloadRefreshMessage deleteMessage = new 
QueryWorkloadRefreshMessage(queryWorkloadName,
+            QueryWorkloadRefreshMessage.DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE, 
new InstanceCost(0, 0));
+        Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instances.stream()
+            .collect(Collectors.toMap(instance -> instance, instance -> 
deleteMessage));
+
+        // Send the QueryWorkloadRefreshMessage to the instances
+       sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+        LOGGER.info("Successfully propagated workload delete for: {} to {} 
instances", queryWorkloadName,
+            instances.size());
+      } catch (Exception e) {
+        String errorMsg = String.format("Failed to propagate workload delete 
for: %s with nodeConfig: %s",
+            queryWorkloadName, nodeConfig);
+        LOGGER.error(errorMsg, e);
+        throw new RuntimeException(errorMsg, e);
+      }
     }
   }
 
   /**
-   * Propagate the workload for the given table name, it does fast exits if 
queryWorkloadConfigs is empty
-   * @param tableName The table name to propagate the workload for, it can be 
a rawTableName or a tableNameWithType
-   * if rawTableName is provided, it will resolve all available tableTypes and 
propagate the workload for each tableType
-   *
-   * This method performs the following steps:
-   * 1. Find all the helix tags associated with the table
-   * 2. Find all the {@link QueryWorkloadConfig} associated with the helix tags
-   * 3. Propagate the workload cost for instances associated with the workloads
+   * Propagates workload updates for all workloads that apply to the given 
table.
+   *
+   * <p>
+   * This helper performs the following:
+   * </p>
+   * <ol>
+   *   <li>Fetches all {@link QueryWorkloadConfig}s from Zookeeper.</li>
+   *   <li>Resolves the Helix tags associated with the table (supports raw 
table names and
+   *       type-qualified names).</li>
+   *   <li>Filters the workload configs to those whose scope matches the 
table's tags.</li>
+   *   <li>Invokes {@link 
#propagateWorkloadUpdateMessage(QueryWorkloadConfig)} for each match.</li>
+   * </ol>
+   *
+   * <p>
+   * If no workloads are configured, the method returns immediately. Any 
exception encountered is
+   * logged and rethrown as a {@link RuntimeException}.
+   * </p>
+   *
+   * @param tableName The raw or type-qualified table name (e.g., {@code 
myTable} or
+   *                  {@code myTable_OFFLINE}).
+   * @throws RuntimeException If propagation fails due to Helix/ZK access or 
message dispatch
+   *                          errors.
    */
   public void propagateWorkloadFor(String tableName) {
     try {
       List<QueryWorkloadConfig> queryWorkloadConfigs = 
_pinotHelixResourceManager.getAllQueryWorkloadConfigs();
       if (queryWorkloadConfigs.isEmpty()) {
-          return;
+        return;
       }
       // Get the helixTags associated with the table
       List<String> helixTags = 
PropagationUtils.getHelixTagsForTable(_pinotHelixResourceManager, tableName);
+      if (helixTags.isEmpty()) {
+        LOGGER.warn("No Helix tags found for table: {}, skipping workload 
propagation", tableName);
+        return;
+      }
+
       // Find all workloads associated with the helix tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
           
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
helixTags, queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.info("No workload configs match table: {}, no propagation 
needed", tableName);
+        return;
+      }
+
       // Propagate the workload for each QueryWorkloadConfig
+      int successCount = 0;
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
-        propagateWorkloadUpdateMessage(queryWorkloadConfig);
+        try {
+          List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);

Review Comment:
   I would suggest that you re-think if this is need. As I said, this 
validation is too late and not useful. Would leave the final decision to you. 
   
   Having validation for manual edits is unnecessary in my opinion as the 
operator really knows what they are doing. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -201,11 +291,51 @@ public static Set<QueryWorkloadConfig> 
getQueryWorkloadConfigsForTags(
     return matchedConfigs;
   }
 
+  /**
+   * Returns all possible Helix tags for a given tenant name.
+   *
+   * <p>This includes broker, offline, and realtime tags.</p>
+   *
+   * @param tenantName Tenant name.
+   * @return A list of Helix tags for the tenant.
+   */
   private static List<String> getAllPossibleHelixTagsFor(String tenantName) {
     List<String> helixTags = new ArrayList<>();
     helixTags.add(TagNameUtils.getBrokerTagForTenant(tenantName));
     helixTags.add(TagNameUtils.getOfflineTagForTenant(tenantName));
     helixTags.add(TagNameUtils.getRealtimeTagForTenant(tenantName));
     return helixTags;
   }
+
+  /**
+   * Extracts all top-level cost IDs from the given propagation scheme.

Review Comment:
   Can we just call this propagationEntities and say it will not fetch for 
overrides?? top level cost IDs seems be a new nomenclature we are introducing 
here. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -58,108 +84,237 @@ public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager)
   }
 
   /**
-   * Propagate the workload to the relevant instances based on the 
PropagationScheme
-   * @param queryWorkloadConfig The query workload configuration to propagate
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Calculate the instance cost for each instance
-   * 3. Send the {@link QueryWorkloadRefreshMessage} to the instances
+   * Propagates an upsert of a workload's cost configuration to all relevant 
instances.
+   *
+   * <p>
+   * For each {@link NodeConfig} in the supplied {@link QueryWorkloadConfig}, 
this method:
+   * </p>
+   * <ol>
+   *   <li>Resolves the {@link PropagationScheme} from the node's configured 
scheme type.</li>
+   *   <li>Computes the per-instance {@link InstanceCost} map using the 
configured
+   *       {@link CostSplitter}.</li>
+   *   <li>Sends a {@link QueryWorkloadRefreshMessage} with subtype
+   *       {@link 
QueryWorkloadRefreshMessage#REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE} to each
+   *       instance with its computed cost.</li>
+   * </ol>
+   *
+   * <p>
+   * This call is idempotent from the manager's perspective: the same inputs 
will result in the
+   * same set of messages being sent. Instances are expected to apply the new 
costs immediately.
+   * </p>
+   *
+   * <p>
+   *  This call is atomic to the extent possible: if any error occurs during 
estimating the target instances
+   *  and their cost. The entire propagation is aborted and no partial updates 
are sent to any instances.
+   * </p>
+   *
+   * <p>
+   *  We rely on Helix reliable messaging to ensure message delivery to 
instances.
+   *  However, if an instance is down during the propagation, it will miss the 
update however, we have logic
+   *  on the instance side to fetch the latest workload configs from 
controller during startup.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload definition (name, node types, 
budgets, and propagation
+   *                            scheme) to propagate.
    */
   public void propagateWorkloadUpdateMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      // Resolve the instances based on the node type and propagation scheme
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload update for: {}", queryWorkloadName);
+
+    Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = new 
HashMap<>();
+    try {
+      Map<String, InstanceCost> workloadInstanceCostMap = new HashMap<>();
+      for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
+        resolveInstanceCostMap(nodeConfig, workloadInstanceCostMap);
       }
-      Map<String, InstanceCost> instanceCostMap = 
_costSplitter.computeInstanceCostMap(nodeConfig, instances);
-      Map<String, QueryWorkloadRefreshMessage> instanceToRefreshMessageMap = 
instanceCostMap.entrySet().stream()
+      Map<String, QueryWorkloadRefreshMessage> nodeToRefreshMessageMap = 
workloadInstanceCostMap.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
QueryWorkloadRefreshMessage(queryWorkloadName,
               QueryWorkloadRefreshMessage.REFRESH_QUERY_WORKLOAD_MSG_SUB_TYPE, 
entry.getValue())));
-      // Send the QueryWorkloadRefreshMessage to the instances
-      
_pinotHelixResourceManager.sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      instanceToRefreshMessageMap.putAll(nodeToRefreshMessageMap);
+      // Sends the message only after all nodeConfigs are processed 
successfully
+      sendQueryWorkloadRefreshMessage(instanceToRefreshMessageMap);
+      LOGGER.info("Successfully propagated workload update for: {} to {} 
instances", queryWorkloadName,
+          instanceToRefreshMessageMap.size());
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to propagate workload update 
for: %s", queryWorkloadName);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+  }
+
+  private void resolveInstanceCostMap(NodeConfig nodeConfig, Map<String, 
InstanceCost> instanceCostMap) {
+    PropagationScheme propagationScheme = 
_propagationSchemeProvider.getPropagationScheme(
+        nodeConfig.getPropagationScheme().getPropagationType());
+    for (PropagationEntity entity : 
nodeConfig.getPropagationScheme().getPropagationEntities()) {
+      if (entity.getOverrides() != null && 
propagationScheme.isOverrideSupported(entity)) {
+        List<PropagationEntityOverrides> overrides = entity.getOverrides();
+        // Apply each override separately and aggregate the instance costs
+        for (PropagationEntityOverrides override : overrides) {
+          resolveAndAggregateInstanceCosts(propagationScheme, entity, 
override, nodeConfig.getNodeType(),
+              instanceCostMap);
+        }
+      } else {
+        resolveAndAggregateInstanceCosts(propagationScheme, entity, null, 
nodeConfig.getNodeType(),
+            instanceCostMap);
+      }
     }
   }
 
+  private void resolveAndAggregateInstanceCosts(PropagationScheme 
propagationScheme,
+                                                PropagationEntity entity, 
PropagationEntityOverrides override,
+                                                NodeConfig.Type nodeType,
+                                                Map<String, InstanceCost> 
workloadInstanceCostMap) {
+    Set<String> instances = propagationScheme.resolveInstances(entity, 
nodeType, override);
+    Map<String, InstanceCost> entityInstanceCostMap = 
_costSplitter.computeInstanceCostMap(entity.getCpuCostNs(),
+        entity.getMemoryCostBytes(), instances);
+    PropagationUtils.mergeCosts(workloadInstanceCostMap, 
entityInstanceCostMap);
+  }
+
   /**
-   * Propagate delete workload refresh message for the given 
queryWorkloadConfig
-   * @param queryWorkloadConfig The query workload configuration to delete
-   * 1. Resolve the instances based on the node type and propagation scheme
-   * 2. Send the {@link QueryWorkloadRefreshMessage} with 
DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE to the instances
+   * Propagates a delete for the given workload to all relevant instances.
+   *
+   * <p>
+   * The method resolves the target instances for each {@link NodeConfig} and 
sends a
+   * {@link QueryWorkloadRefreshMessage} with subtype
+   * {@link QueryWorkloadRefreshMessage#DELETE_QUERY_WORKLOAD_MSG_SUB_TYPE},
+   * which instructs the instance to remove local state associated with the 
workload and stop enforcing costs for it.
+   * </p>
+   *
+   * @param queryWorkloadConfig The workload to delete (only the name and node 
scoping are used).
    */
   public void propagateDeleteWorkloadMessage(QueryWorkloadConfig 
queryWorkloadConfig) {
     String queryWorkloadName = queryWorkloadConfig.getQueryWorkloadName();
-    for (NodeConfig nodeConfig: queryWorkloadConfig.getNodeConfigs()) {
-      Set<String> instances = resolveInstances(nodeConfig);
-      if (instances.isEmpty()) {
-        String errorMsg = String.format("No instances found for Workload: %s", 
queryWorkloadName);
-        LOGGER.warn(errorMsg);
-        continue;
+    LOGGER.info("Propagating workload delete for: {}", queryWorkloadName);

Review Comment:
   Let's say I initially have a workload defined with Table PropagationType 
where Table1 and Table2 are the entities (assume no hosts are common between 
table1 and table2)
   
   Now if I update the workload to  remove Table2 from the propagation 
entities, do we handle it by sending a deleteWorkloadMessage to hosts of 
table1? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to