vvivekiyer commented on code in PR #16672:
URL: https://github.com/apache/pinot/pull/16672#discussion_r2350267909


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java:
##########
@@ -94,38 +94,65 @@ public String getQueryWorkloadConfigs(@Context HttpHeaders 
httpHeaders) {
   }
 
   /**
-   * API to fetch query workload config
-   * @param queryWorkloadName Name of the query workload
-   * Example request:
-   * /queryWorkloadConfigs/workload-foo1
-   * Example response:
+   * Retrieves the query workload configuration for the specified workload 
name.
+   * <p>
+   * This API returns the detailed configuration including node-specific 
settings,
+   * enforcement profiles, propagation schemes, and cost splits.
+   * </p>
+   * <p><strong>Example:</strong></p>
+   * <pre>{@code
    * {
-   *   "queryWorkloadName" : "workload-foo1",
-   *   "nodeConfigs" : {
-   *   {
-   *       "nodeType" : "brokerNode",
+   *   "queryWorkloadName": "workload-foo1",
+   *   "nodeConfigs": [
+   *     {
+   *       "nodeType": "brokerNode",
    *       "enforcementProfile": {
    *         "cpuCostNs": 500,
    *         "memoryCostBytes": 1000
    *       },
    *       "propagationScheme": {
    *         "propagationType": "TABLE",
-   *         "values": ["airlineStats"]
+   *         "costSplits": [
+   *           {
+   *             "costId": "airlineStats",

Review Comment:
   costId as the key is confusing. Can we use something like 
"PropogationEntity" ? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/CostSplit.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+public class CostSplit extends BaseJsonConfig {
+  @JsonPropertyDescription("Describes the unique identifier for cost 
allocation could be a table,tenant etc.")
+  private String _costId;
+
+  @JsonPropertyDescription("Max CPU cost allowed for the cost id, if not 
specified, inherited from parent")
+  private Long _cpuCostNs;
+
+  @JsonPropertyDescription("Max memory cost allowed for the cost id, if not 
specified, inherited from parent")
+  private Long _memoryCostBytes;
+
+  /**
+   * Optional nested allocations. Omitted or empty when not used.
+   * Could be used to represent sub-allocations within the cost id,
+   * such as allocations for CONSUMING/COMPLETED tenants within a realtime 
table.
+   */
+  @JsonPropertyDescription("Optional nested allocations for the cost id")
+  private List<CostSplit> _subAllocations;

Review Comment:
   If I define suballocations for CONSUMING/COMPLETED tenants within a Table 
Propagation schema, how would the splitter know that each suballocation is a 
tenant? I hope the code does not make the assumption that these suballocations 
are tenants for a Table Propagation scheme.



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -227,4 +240,62 @@ private SegmentConsumerInfo 
getSegmentConsumerInfo(SegmentDataManager segmentDat
     }
     return segmentConsumerInfo;
   }
+
+  /**
+   * Get the instance cost (budget) information for a specific workload.
+   * Returns with CPU and memory budget information enforced on this instance 
for the workload.
+   * Example response:
+   * {
+   *  "workloadName": "testWorkload",
+   *  "cpuCostNs": 5000000,
+   *  "memoryCostBytes": 104857600
+   *  }
+   *  If the workload is not found, returns 404.
+   *  If the WorkloadBudgetManager is not available, returns 500.
+   *  If the workload name is invalid, returns 400.
+   */
+  @GET
+  @Path("queryWorkloadCost/{workloadName}")
+  @ApiOperation(value = "Get instance cost information for a specific 
workload")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Workload not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  @Produces(MediaType.APPLICATION_JSON)
+  public String getWorkloadCost(
+      @ApiParam(value = "Name of the workload", required = true) 
@PathParam("workloadName") String workloadName
+  ) {
+    // Input validation
+    if (workloadName == null || workloadName.trim().isEmpty()) {
+      throw new WebApplicationException("Workload name cannot be null or 
empty",
+          Response.Status.BAD_REQUEST);
+    }
+    try {
+      WorkloadBudgetManager workloadBudgetManager = 
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+      if (workloadBudgetManager == null) {
+        LOGGER.warn("WorkloadBudgetManager is not available on instance: {}", 
_instanceId);

Review Comment:
   Do we need these LOGGERs given that the response is indicating the right 
error? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {

Review Comment:
   nit: group condition in L248 and this into a single one. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,192 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.CostSplit;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
+ * <p>
+ * This scheme looks up Helix tags for offline and realtime tables and maps 
them to
+ * instances, enabling workload propagation by table.
+ * </p>
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * @param nodeConfig Node configuration containing propagation scheme and 
cost splits.
+   * @return A set of instance names that should receive workload messages.
+   * @throws IllegalArgumentException If no instances are found for a cost 
split.
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
-    for (String tableName : tableNames) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-      List<String> tablesWithType = new ArrayList<>();
-      if (tableType == null) {
-        // Get both offline and realtime table names if type is not present.
-        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
-        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
-      } else {
-        tablesWithType.add(tableName);
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags =
+        PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
+    Map<String, Set<String>> helixTagToInstances =
+        PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
+
+    NodeConfig.Type nodeType = nodeConfig.getNodeType();
+    for (CostSplit costSplit : 
nodeConfig.getPropagationScheme().getCostSplits()) {
+      Map<String, Set<String>> byTag = resolveInstancesByHelixTag(costSplit, 
nodeType, tableWithTypeToHelixTags,
+          helixTagToInstances);
+      if (!byTag.isEmpty()) {
+        for (Set<String> set : byTag.values()) {
+          instances.addAll(set);
+        }
       }
-      for (String tableWithType : tablesWithType) {
-        Map<NodeConfig.Type, Set<String>> nodeToHelixTags = 
tableWithTypeToHelixTags.get(tableWithType);
-        if (nodeToHelixTags != null) {
-          Set<String> helixTags = 
nodeToHelixTags.get(nodeConfig.getNodeType());
-          if (helixTags != null) {
-            for (String helixTag : helixTags) {
-              Set<String> helixInstances = helixTagToInstances.get(helixTag);
-              if (helixInstances != null) {
-                instances.addAll(helixInstances);
-              }
+    }
+    return instances;
+  }
+
+  /**
+   * Computes the per-instance cost map for the given node config using the 
provided splitter.
+   *
+   * <p>
+   * This method supports sub-allocations: the cost-ids in the sub-allocations 
are the helix tags.

Review Comment:
   For TablePropagationScheme:
   1. Each costId is a Table
   2. Each sub-allocation is a Helix Tag aka Tenant? This seems odd. Can we 
just make the sub-allocation a tableType (with some special case logic to 
handle different costs for consuming/completed tenants)? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationUtils.java:
##########
@@ -101,74 +147,129 @@ private static void 
collectHelixTagsForTable(List<String> tags, TenantConfig ten
   }
 
   /**
-   * Get the helix tags for a given table name.
-   * If the table name does not have a type suffix, it will return both 
offline and realtime tags.
+   * Resolves Helix tags for a table.
+   *
+   * <p>
+   * If the input table name lacks a type suffix, both offline and realtime 
table names are expanded
+   * and resolved. Otherwise, the specific table name is used.
+   * </p>
+   *
+   * @param pinotResourceManager Resource manager to fetch table configs.
+   * @param tableName The raw or type-qualified table name.
+   * @return A list of Helix tags associated with the table.
    */
   public static List<String> getHelixTagsForTable(PinotHelixResourceManager 
pinotResourceManager, String tableName) {
+    if (tableName == null || tableName.trim().isEmpty()) {
+      throw new IllegalArgumentException("Table name cannot be null or empty");
+    }
+
     List<String> combinedTags = new ArrayList<>();
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     List<String> tablesWithType = (tableType == null)
         ? Arrays.asList(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
             TableNameBuilder.REALTIME.tableNameWithType(tableName))
         : Collections.singletonList(tableName);
     for (String table : tablesWithType) {
-      TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
-      if (tableConfig != null) {
-        collectHelixTagsForTable(combinedTags, tableConfig.getTenantConfig(), 
tableConfig.getTableType());
+      try {
+        TableConfig tableConfig = pinotResourceManager.getTableConfig(table);
+        if (tableConfig != null && tableConfig.getTenantConfig() != null && 
tableConfig.getTableType() != null) {
+          collectHelixTagsForTable(combinedTags, 
tableConfig.getTenantConfig(), tableConfig.getTableType());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to get table config for table: " + 
table, e);
       }
     }
     return combinedTags;
   }
 
   /**
-   * Get the mapping between helix tag -> instances
+   * Builds a mapping from Helix tag to the set of instances carrying that tag.
+   *
+   * @param pinotResourceManager Resource manager used to fetch instance 
configs.
+   * @return A mapping of Helix tag → set of instance names.
    */
   public static Map<String, Set<String>> 
getHelixTagToInstances(PinotHelixResourceManager pinotResourceManager) {
     Map<String, Set<String>> tagToInstances = new HashMap<>();
-    for (InstanceConfig instanceConfig : 
pinotResourceManager.getAllHelixInstanceConfigs()) {
-      String instanceName = instanceConfig.getInstanceName();
-      for (String helixTag : instanceConfig.getTags()) {
-        tagToInstances.computeIfAbsent(helixTag, tag -> new 
HashSet<>()).add(instanceName);
+    try {
+      List<InstanceConfig> instanceConfigs = 
pinotResourceManager.getAllHelixInstanceConfigs();
+      if (instanceConfigs == null) {
+        LOGGER.warn("No instance configs found, returning empty mapping");
+        return tagToInstances;
       }
+
+      for (InstanceConfig instanceConfig : instanceConfigs) {
+        if (instanceConfig == null) {
+          LOGGER.warn("Skipping null instance config");
+          continue;
+        }
+        String instanceName = instanceConfig.getInstanceName();
+        if (instanceName == null || instanceName.trim().isEmpty()) {

Review Comment:
   I see a number of additional validation added. Are these all necessary? For 
example, can there be an instance with null name or empty name? 



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -227,4 +240,62 @@ private SegmentConsumerInfo 
getSegmentConsumerInfo(SegmentDataManager segmentDat
     }
     return segmentConsumerInfo;
   }
+
+  /**
+   * Get the instance cost (budget) information for a specific workload.

Review Comment:
   While we are adding debug APIs, let's also add one to get all the workloads 
and their initial+remaining costs for a n instance. 
   
   This will be useful to debug whether the propagation has gone through or 
not. 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java:
##########
@@ -178,6 +178,14 @@ public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
     return new BudgetStats(totalCpuBudget, totalMemoryBudget, 
totalCpuRemaining, totalMemRemaining);
   }
 
+  public BudgetStats getBudgetStats(String workload) {

Review Comment:
   We already have getRemainingBudgetForWorkload(). How is this different and 
why is this needed? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryWorkloadRestletResource.java:
##########
@@ -94,38 +94,65 @@ public String getQueryWorkloadConfigs(@Context HttpHeaders 
httpHeaders) {
   }
 
   /**
-   * API to fetch query workload config
-   * @param queryWorkloadName Name of the query workload
-   * Example request:
-   * /queryWorkloadConfigs/workload-foo1
-   * Example response:
+   * Retrieves the query workload configuration for the specified workload 
name.
+   * <p>
+   * This API returns the detailed configuration including node-specific 
settings,
+   * enforcement profiles, propagation schemes, and cost splits.
+   * </p>
+   * <p><strong>Example:</strong></p>
+   * <pre>{@code
    * {
-   *   "queryWorkloadName" : "workload-foo1",
-   *   "nodeConfigs" : {
-   *   {
-   *       "nodeType" : "brokerNode",
+   *   "queryWorkloadName": "workload-foo1",
+   *   "nodeConfigs": [
+   *     {
+   *       "nodeType": "brokerNode",
    *       "enforcementProfile": {
    *         "cpuCostNs": 500,
    *         "memoryCostBytes": 1000
    *       },
    *       "propagationScheme": {
    *         "propagationType": "TABLE",
-   *         "values": ["airlineStats"]
+   *         "costSplits": [
+   *           {
+   *             "costId": "airlineStats",

Review Comment:
   Can you add some comments to clarify what each CostId/PropogationEntity 
should be? 
   For example: if propagationType is TABLE, each CostId is expected to a be 
valid TableName.. and so on. 



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java:
##########
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.workload.CostSplit;
+import org.apache.pinot.spi.config.workload.EnforcementProfile;
+import org.apache.pinot.spi.config.workload.NodeConfig;
+import org.apache.pinot.spi.config.workload.PropagationScheme;
+import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class QueryWorkloadIntegrationTest extends BaseClusterIntegrationTest {
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int NUM_REALTIME_SEGMENTS = 6;
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + 
"."
+        + 
CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION, true);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + 
"."
+        + 
CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION, true);
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start Zk, Kafka and Pinot
+    startZk();
+    startController();
+    startBroker();
+    Tracing.unregisterThreadAccountant();
+    startServer();
+    startKafka();
+
+    List<File> avroFiles = getAllAvroFiles();
+    List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, 
NUM_OFFLINE_SEGMENTS);
+    List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, 
NUM_REALTIME_SEGMENTS);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+    addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+        _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(realtimeAvroFiles);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(100_000L);
+  }
+
+  // TODO: Expand tests to cover more scenarios for workload enforcement
+  @Test
+  public void testQueryWorkloadConfig() throws Exception {
+    EnforcementProfile enforcementProfile = new EnforcementProfile(1000, 1000);
+    CostSplit costSplit = new CostSplit(DEFAULT_TABLE_NAME + "_OFFLINE", 
1000L, 1000L, null);
+    PropagationScheme propagationScheme = new 
PropagationScheme(PropagationScheme.Type.TABLE, List.of(costSplit));
+    NodeConfig nodeConfig = new NodeConfig(NodeConfig.Type.SERVER_NODE, 
enforcementProfile, propagationScheme);
+    QueryWorkloadConfig queryWorkloadConfig = new 
QueryWorkloadConfig("testWorkload", List.of(nodeConfig));
+    try {
+      
getControllerRequestClient().updateQueryWorkloadConfig(queryWorkloadConfig);
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          QueryWorkloadConfig retrievedConfig = 
getControllerRequestClient().getQueryWorkloadConfig("testWorkload");
+          return retrievedConfig != null && 
retrievedConfig.equals(queryWorkloadConfig);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }, 60_000L, "Failed to retrieve the created query workload config");
+      // Get server instances that actually serve this specific table
+      String tableName = getTableName();
+      Set<String> serverInstances = getServerInstancesForTable(tableName);
+      long expectedCpuCostNs = costSplit.getCpuCostNs() / 
serverInstances.size();
+      long expectedMemoryCostBytes = costSplit.getMemoryCostBytes() / 
serverInstances.size();
+      // Test calling the endpoints on each server that serves this table
+      for (String serverInstance : serverInstances) {
+        testServerQueryWorkloadEndpoints(serverInstance, "testWorkload", 
expectedCpuCostNs, expectedMemoryCostBytes);
+      }
+    } finally {
+      getControllerRequestClient().deleteQueryWorkloadConfig("testWorkload");
+    }
+  }
+
+  /**
+   * Test QueryWorkloadResource endpoints on a specific server instance for a 
specific workload
+   */
+  private void testServerQueryWorkloadEndpoints(String serverInstance, String 
workloadName,
+                                                long expectedCpuBudgetNs, long 
expectedMemoryBudgetBytes)
+      throws Exception {
+    // Extract host from server instance name (format: Server_hostname_port)
+    String[] parts = serverInstance.split("_");
+    String host = parts[1];
+
+    // Use the proper admin API port (not the netty port from instance name)
+    String serverBaseApiUrl = "http://"; + host + ":" + getServerAdminApiPort();
+
+    // Test the get specific workload endpoint (GET 
/queryWorkloadCost/{workloadName})
+    String getWorkloadUrl = serverBaseApiUrl + "/debug/queryWorkloadCost/" + 
workloadName;
+    String workloadResponse = sendGetRequest(getWorkloadUrl);
+
+    // Verify response is valid JSON and contains InstanceCost structure
+    JsonNode workloadResponseJson = 
JsonUtils.stringToJsonNode(workloadResponse);
+    assertNotNull(workloadResponseJson);
+    long actualCpuCostNs = workloadResponseJson.get("cpuBudgetNs").asLong();
+    long actualMemoryCostBytes = 
workloadResponseJson.get("memoryBudgetBytes").asLong();
+    assertEquals(actualCpuCostNs, expectedCpuBudgetNs);
+    assertEquals(actualMemoryCostBytes, expectedMemoryBudgetBytes);
+  }
+
+  /**
+   * Get the definitive list of server instances that serve a specific table

Review Comment:
   Would this be useful given that we only have 1 server in the integration 
test? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/workload/PropagationScheme.java:
##########
@@ -115,9 +132,9 @@ public static Type forValue(String value) {
    */
   @JsonCreator
   public PropagationScheme(@JsonProperty(PROPAGATION_TYPE) Type 
propagationType,
-      @JsonProperty(VALUES) List<String> values) {
+      @Nullable @JsonProperty(COST_SPLITS) List<CostSplit> costSplits) {

Review Comment:
   Can Costsplits be empty? Looks like it should be required. 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {
+      errors.add(prefix + " cannot be empty");
+      return;
+    }
+
+    Set<String> costIds = new HashSet<>();
+    long totalCpuCost = 0;
+    long totalMemoryCost = 0;
+
+    for (int i = 0; i < costSplits.size(); i++) {
+      CostSplit costSplit = costSplits.get(i);
+      String costSplitPrefix = prefix + "[" + i + "]";
+
+      if (costSplit == null) {
+        errors.add(costSplitPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate costId
+      String costId = costSplit.getCostId();
+      if (costId == null || costId.trim().isEmpty()) {
+        errors.add(costSplitPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate costIds
+        if (costIds.contains(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' is 
duplicated");
+        } else {
+          costIds.add(costId);
+        }
+
+        // Validate costId format (basic validation)
+        if (!isValidCostId(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' contains 
invalid characters");
+        }
+      }
+      // Validate that either both costs are null or both are non-null when 
sub-allocations exist
+      boolean hasCpuCost = costSplit.getCpuCostNs() != null;
+      boolean hasMemoryCost = costSplit.getMemoryCostBytes() != null;
+      boolean hasSubAllocations = costSplit.getSubAllocations() != null;
+
+      if ((hasCpuCost != hasMemoryCost)) {
+        errors.add(costSplitPrefix + ".cpuCostNs and memoryCostBytes must 
either both be null or both be non-null");
+        if (hasSubAllocations) {
+          errors.add(costSplitPrefix + ".subAllocations must be null when 
costs are null");
+        }
+        continue;
+      }
+      // Validate CPU cost
+      Long cpuCostNs = costSplit.getCpuCostNs();
+      if (cpuCostNs != null) {
+        if (cpuCostNs <= 0) {
+          errors.add(costSplitPrefix + ".cpuCostNs should be greater than 0, 
got: " + cpuCostNs);
+        } else {
+          // Check for potential overflow when summing
+          if (totalCpuCost > Long.MAX_VALUE - cpuCostNs) {
+            errors.add(prefix + " total CPU cost would overflow");
+          } else {
+            totalCpuCost += cpuCostNs;
+          }
+        }
+      }
+
+      // Validate memory cost
+      Long memoryCostBytes = costSplit.getMemoryCostBytes();
+      if (memoryCostBytes != null) {
+        if (memoryCostBytes <= 0) {
+          errors.add(costSplitPrefix + ".memoryCostBytes should be greater 
than 0, got: " + memoryCostBytes);
+        } else {
+          // Check for potential overflow when summing
+          if (totalMemoryCost > Long.MAX_VALUE - memoryCostBytes) {
+            errors.add(prefix + " total memory cost would overflow");
+          } else {
+            totalMemoryCost += memoryCostBytes;
+          }
+        }
+      }
+
+      // Validate sub-allocations (recursive validation)
+      List<CostSplit> subAllocations = costSplit.getSubAllocations();
+      if (subAllocations != null && !subAllocations.isEmpty() && cpuCostNs != 
null && memoryCostBytes != null) {
+        validateCostSplitSubAllocations(subAllocations, costSplitPrefix + 
".subAllocations",
+                                      cpuCostNs, memoryCostBytes, errors);
+      }
+    }
+  }
+
+  /**
+   * Validates sub-allocations within a CostSplit to ensure they don't exceed 
parent limits.
+   *
+   * @param subAllocations the list of sub-allocation CostSplit objects
+   * @param prefix the prefix for error messages
+   * @param parentCpuCostNs the parent's CPU cost limit
+   * @param parentMemoryCostBytes the parent's memory cost limit
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplitSubAllocations(List<CostSplit> 
subAllocations, String prefix,
+                                                     long parentCpuCostNs, 
long parentMemoryCostBytes,
+                                                     List<String> errors) {
+    if (subAllocations.isEmpty()) {
+      errors.add(prefix + " cannot be empty when specified");
+      return;
+    }
+
+    Set<String> subCostIds = new HashSet<>();
+    long totalSubCpuCost = 0;
+    long totalSubMemoryCost = 0;
+
+    for (int i = 0; i < subAllocations.size(); i++) {
+      CostSplit subAllocation = subAllocations.get(i);
+      String subPrefix = prefix + "[" + i + "]";
+
+      if (subAllocation == null) {
+        errors.add(subPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate sub-allocation costId
+      String subCostId = subAllocation.getCostId();
+      if (subCostId == null || subCostId.trim().isEmpty()) {
+        errors.add(subPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate sub-allocation costIds
+        if (subCostIds.contains(subCostId)) {
+          errors.add(subPrefix + ".costId '" + subCostId + "' is duplicated 
within sub-allocations");
+        } else {
+          subCostIds.add(subCostId);
+        }
+
+        if (!isValidCostId(subCostId)) {
+          errors.add(subPrefix + ".costId '" + subCostId + "' contains invalid 
characters");
+        }
+      }
+
+      // Validate sub-allocation costs
+      long subCpuCostNs = subAllocation.getCpuCostNs();
+      long subMemoryCostBytes = subAllocation.getMemoryCostBytes();
+
+      if (subCpuCostNs < 0) {
+        errors.add(subPrefix + ".cpuCostNs cannot be negative, got: " + 
subCpuCostNs);
+      } else if (subCpuCostNs == 0) {
+        errors.add(subPrefix + ".cpuCostNs should be positive, got: " + 
subCpuCostNs);
+      } else {
+        totalSubCpuCost += subCpuCostNs;
+      }
+
+      if (subMemoryCostBytes < 0) {
+        errors.add(subPrefix + ".memoryCostBytes cannot be negative, got: " + 
subMemoryCostBytes);
+      } else if (subMemoryCostBytes == 0) {
+        errors.add(subPrefix + ".memoryCostBytes should be positive, got: " + 
subMemoryCostBytes);
+      } else {
+        totalSubMemoryCost += subMemoryCostBytes;
+      }
+
+      // Sub-allocations should not have their own sub-allocations (prevent 
deep nesting)
+      if (subAllocation.getSubAllocations() != null && 
!subAllocation.getSubAllocations().isEmpty()) {
+        errors.add(subPrefix + ".subAllocations nested sub-allocations are not 
supported");
+      }
+    }
+
+    // Validate that sub-allocations don't exceed parent limits
+    if (totalSubCpuCost > parentCpuCostNs) {
+      errors.add(prefix + " total CPU cost (" + totalSubCpuCost
+          + "ns) exceeds parent limit (" + parentCpuCostNs + "ns)");
+    }
+
+    if (totalSubMemoryCost > parentMemoryCostBytes) {
+      errors.add(prefix + " total memory cost (" + totalSubMemoryCost
+          + " bytes) exceeds parent limit (" + parentMemoryCostBytes + " 
bytes)");
+    }
+  }
+
+  /**
+   * Validates that a costId contains only valid characters.
+   * Cost IDs should be alphanumeric with underscores, hyphens, and dots 
allowed.
+   *
+   * @param costId the cost ID to validate
+   * @return true if the cost ID is valid, false otherwise
+   */
+  private static boolean isValidCostId(String costId) {
+    if (costId == null || costId.trim().isEmpty()) {

Review Comment:
   This check is redundant as we are also checking in L237



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {
+      errors.add(prefix + " cannot be empty");
+      return;
+    }
+
+    Set<String> costIds = new HashSet<>();
+    long totalCpuCost = 0;
+    long totalMemoryCost = 0;
+
+    for (int i = 0; i < costSplits.size(); i++) {
+      CostSplit costSplit = costSplits.get(i);
+      String costSplitPrefix = prefix + "[" + i + "]";
+
+      if (costSplit == null) {
+        errors.add(costSplitPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate costId
+      String costId = costSplit.getCostId();
+      if (costId == null || costId.trim().isEmpty()) {
+        errors.add(costSplitPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate costIds
+        if (costIds.contains(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' is 
duplicated");
+        } else {
+          costIds.add(costId);
+        }
+
+        // Validate costId format (basic validation)
+        if (!isValidCostId(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' contains 
invalid characters");
+        }
+      }
+      // Validate that either both costs are null or both are non-null when 
sub-allocations exist
+      boolean hasCpuCost = costSplit.getCpuCostNs() != null;
+      boolean hasMemoryCost = costSplit.getMemoryCostBytes() != null;
+      boolean hasSubAllocations = costSplit.getSubAllocations() != null;
+
+      if ((hasCpuCost != hasMemoryCost)) {
+        errors.add(costSplitPrefix + ".cpuCostNs and memoryCostBytes must 
either both be null or both be non-null");
+        if (hasSubAllocations) {
+          errors.add(costSplitPrefix + ".subAllocations must be null when 
costs are null");

Review Comment:
   what about the condition where hasSubAllocations = true but both hasCpuCost 
and hasMemoryCost are false? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java:
##########
@@ -702,4 +702,16 @@ public String forClusterConfigUpdate() {
   public String forClusterConfigDelete(String config) {
     return StringUtil.join("/", _baseUrl, "cluster", "configs", config);
   }
+
+  public String forQueryWorkloadConfigUpdate() {
+    return StringUtil.join("/", _baseUrl, "queryWorkloadConfigs");
+  }
+
+  public String forQueryWorkloadConfigDelete(String config) {

Review Comment:
   forQueryWorkloadConfigDelete and forQueryWorkloadConfigGet look exactly the 
same. Can we have one function that does both? Give it a generic name please. 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {
+      errors.add(prefix + " cannot be empty");
+      return;
+    }
+
+    Set<String> costIds = new HashSet<>();
+    long totalCpuCost = 0;
+    long totalMemoryCost = 0;
+
+    for (int i = 0; i < costSplits.size(); i++) {
+      CostSplit costSplit = costSplits.get(i);
+      String costSplitPrefix = prefix + "[" + i + "]";
+
+      if (costSplit == null) {
+        errors.add(costSplitPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate costId
+      String costId = costSplit.getCostId();
+      if (costId == null || costId.trim().isEmpty()) {
+        errors.add(costSplitPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate costIds
+        if (costIds.contains(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' is 
duplicated");
+        } else {
+          costIds.add(costId);
+        }
+
+        // Validate costId format (basic validation)
+        if (!isValidCostId(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' contains 
invalid characters");
+        }
+      }
+      // Validate that either both costs are null or both are non-null when 
sub-allocations exist
+      boolean hasCpuCost = costSplit.getCpuCostNs() != null;
+      boolean hasMemoryCost = costSplit.getMemoryCostBytes() != null;
+      boolean hasSubAllocations = costSplit.getSubAllocations() != null;
+
+      if ((hasCpuCost != hasMemoryCost)) {
+        errors.add(costSplitPrefix + ".cpuCostNs and memoryCostBytes must 
either both be null or both be non-null");
+        if (hasSubAllocations) {
+          errors.add(costSplitPrefix + ".subAllocations must be null when 
costs are null");
+        }
+        continue;
+      }
+      // Validate CPU cost
+      Long cpuCostNs = costSplit.getCpuCostNs();
+      if (cpuCostNs != null) {
+        if (cpuCostNs <= 0) {
+          errors.add(costSplitPrefix + ".cpuCostNs should be greater than 0, 
got: " + cpuCostNs);
+        } else {
+          // Check for potential overflow when summing
+          if (totalCpuCost > Long.MAX_VALUE - cpuCostNs) {
+            errors.add(prefix + " total CPU cost would overflow");
+          } else {
+            totalCpuCost += cpuCostNs;
+          }
+        }
+      }
+
+      // Validate memory cost
+      Long memoryCostBytes = costSplit.getMemoryCostBytes();
+      if (memoryCostBytes != null) {
+        if (memoryCostBytes <= 0) {
+          errors.add(costSplitPrefix + ".memoryCostBytes should be greater 
than 0, got: " + memoryCostBytes);
+        } else {
+          // Check for potential overflow when summing
+          if (totalMemoryCost > Long.MAX_VALUE - memoryCostBytes) {
+            errors.add(prefix + " total memory cost would overflow");
+          } else {
+            totalMemoryCost += memoryCostBytes;
+          }
+        }
+      }
+
+      // Validate sub-allocations (recursive validation)
+      List<CostSplit> subAllocations = costSplit.getSubAllocations();
+      if (subAllocations != null && !subAllocations.isEmpty() && cpuCostNs != 
null && memoryCostBytes != null) {
+        validateCostSplitSubAllocations(subAllocations, costSplitPrefix + 
".subAllocations",
+                                      cpuCostNs, memoryCostBytes, errors);
+      }
+    }
+  }
+
+  /**
+   * Validates sub-allocations within a CostSplit to ensure they don't exceed 
parent limits.
+   *
+   * @param subAllocations the list of sub-allocation CostSplit objects
+   * @param prefix the prefix for error messages
+   * @param parentCpuCostNs the parent's CPU cost limit
+   * @param parentMemoryCostBytes the parent's memory cost limit
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplitSubAllocations(List<CostSplit> 
subAllocations, String prefix,
+                                                     long parentCpuCostNs, 
long parentMemoryCostBytes,
+                                                     List<String> errors) {
+    if (subAllocations.isEmpty()) {
+      errors.add(prefix + " cannot be empty when specified");
+      return;
+    }
+
+    Set<String> subCostIds = new HashSet<>();
+    long totalSubCpuCost = 0;
+    long totalSubMemoryCost = 0;
+
+    for (int i = 0; i < subAllocations.size(); i++) {
+      CostSplit subAllocation = subAllocations.get(i);
+      String subPrefix = prefix + "[" + i + "]";
+
+      if (subAllocation == null) {
+        errors.add(subPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate sub-allocation costId
+      String subCostId = subAllocation.getCostId();
+      if (subCostId == null || subCostId.trim().isEmpty()) {
+        errors.add(subPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate sub-allocation costIds
+        if (subCostIds.contains(subCostId)) {
+          errors.add(subPrefix + ".costId '" + subCostId + "' is duplicated 
within sub-allocations");
+        } else {
+          subCostIds.add(subCostId);
+        }
+
+        if (!isValidCostId(subCostId)) {
+          errors.add(subPrefix + ".costId '" + subCostId + "' contains invalid 
characters");
+        }
+      }
+
+      // Validate sub-allocation costs
+      long subCpuCostNs = subAllocation.getCpuCostNs();
+      long subMemoryCostBytes = subAllocation.getMemoryCostBytes();
+
+      if (subCpuCostNs < 0) {
+        errors.add(subPrefix + ".cpuCostNs cannot be negative, got: " + 
subCpuCostNs);
+      } else if (subCpuCostNs == 0) {
+        errors.add(subPrefix + ".cpuCostNs should be positive, got: " + 
subCpuCostNs);
+      } else {
+        totalSubCpuCost += subCpuCostNs;
+      }
+
+      if (subMemoryCostBytes < 0) {
+        errors.add(subPrefix + ".memoryCostBytes cannot be negative, got: " + 
subMemoryCostBytes);
+      } else if (subMemoryCostBytes == 0) {
+        errors.add(subPrefix + ".memoryCostBytes should be positive, got: " + 
subMemoryCostBytes);
+      } else {
+        totalSubMemoryCost += subMemoryCostBytes;
+      }
+
+      // Sub-allocations should not have their own sub-allocations (prevent 
deep nesting)
+      if (subAllocation.getSubAllocations() != null && 
!subAllocation.getSubAllocations().isEmpty()) {
+        errors.add(subPrefix + ".subAllocations nested sub-allocations are not 
supported");
+      }
+    }
+
+    // Validate that sub-allocations don't exceed parent limits
+    if (totalSubCpuCost > parentCpuCostNs) {
+      errors.add(prefix + " total CPU cost (" + totalSubCpuCost
+          + "ns) exceeds parent limit (" + parentCpuCostNs + "ns)");
+    }
+
+    if (totalSubMemoryCost > parentMemoryCostBytes) {
+      errors.add(prefix + " total memory cost (" + totalSubMemoryCost
+          + " bytes) exceeds parent limit (" + parentMemoryCostBytes + " 
bytes)");
+    }
+  }
+
+  /**
+   * Validates that a costId contains only valid characters.
+   * Cost IDs should be alphanumeric with underscores, hyphens, and dots 
allowed.
+   *
+   * @param costId the cost ID to validate
+   * @return true if the cost ID is valid, false otherwise
+   */
+  private static boolean isValidCostId(String costId) {
+    if (costId == null || costId.trim().isEmpty()) {
+      return false;
+    }
+
+    // Allow alphanumeric characters, underscores, hyphens, and dots
+    // This covers table names, tenant names, and other common identifiers in 
Pinot
+    return costId.matches("^[a-zA-Z0-9_.-]+$");

Review Comment:
   This seems unnnecessary. If we relax tableName validation in the future, 
this should be changed too. Can we remove this? 
   
   A better check would be to validate if the tableName/tenantName actually 
exists. 



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -227,4 +240,62 @@ private SegmentConsumerInfo 
getSegmentConsumerInfo(SegmentDataManager segmentDat
     }
     return segmentConsumerInfo;
   }
+
+  /**
+   * Get the instance cost (budget) information for a specific workload.

Review Comment:
   Can we also include the remaining cost information in the response? I think 
that will be useful as well for debugging. 
   



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -51,69 +69,141 @@ public class QueryWorkloadManager {
   private final CostSplitter _costSplitter;
 
   public QueryWorkloadManager(PinotHelixResourceManager 
pinotHelixResourceManager) {

Review Comment:
   Annotate Nullable to signature if pinotHelixResourceManager can be null



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/QueryWorkloadManager.java:
##########
@@ -170,34 +297,111 @@ public Map<String, InstanceCost> 
getWorkloadToInstanceCostFor(String instanceNam
         return workloadToInstanceCostMap;
       }
 
-      // Find all workloads associated with the helix tags
+      // Find all helix tags for this instance
+      InstanceConfig instanceConfig = 
_pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
+      if (instanceConfig == null) {
+        LOGGER.warn("Instance config not found for instance: {}", 
instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      List<String> instanceTags = instanceConfig.getTags();
+      if (instanceTags == null || instanceTags.isEmpty()) {
+        LOGGER.warn("No tags found for instance: {}, cannot compute workload 
costs", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // Filter workloads by the instance's tags
       Set<QueryWorkloadConfig> queryWorkloadConfigsForTags =
-          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceConfig.getTags(),
-                  queryWorkloadConfigs);
-      // Calculate the instance cost from each workload
+          
PropagationUtils.getQueryWorkloadConfigsForTags(_pinotHelixResourceManager, 
instanceTags,
+              queryWorkloadConfigs);
+
+      if (queryWorkloadConfigsForTags.isEmpty()) {
+        LOGGER.debug("No workload configs match instance: {}", instanceName);
+        return workloadToInstanceCostMap;
+      }
+
+      // For each workload, aggregate contributions across all applicable 
nodeConfigs and costSplits
       for (QueryWorkloadConfig queryWorkloadConfig : 
queryWorkloadConfigsForTags) {
         for (NodeConfig nodeConfig : queryWorkloadConfig.getNodeConfigs()) {
-          if (nodeConfig.getNodeType() == nodeType) {
-            Set<String> instances = resolveInstances(nodeConfig);
-            InstanceCost instanceCost = 
_costSplitter.computeInstanceCost(nodeConfig, instances, instanceName);
-            if (instanceCost != null) {
-              
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+          try {
+            if (nodeConfig.getNodeType() == nodeType) {
+              List<String> errors = 
QueryWorkloadConfigUtils.validateQueryWorkloadConfig(queryWorkloadConfig);
+              if (!errors.isEmpty()) {
+                LOGGER.error("Invalid QueryWorkloadConfig: {} for instance: 
{}, errors: {}", queryWorkloadConfig,
+                    instanceName, errors);
+                continue;
+              }
+              Map<String, InstanceCost> instanceCostMap =
+                  
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme()
+                          
.getPropagationType()).resolveInstanceCostMap(nodeConfig, _costSplitter);
+
+              InstanceCost instanceCost = instanceCostMap.get(instanceName);
+              if (instanceCost != null) {
+                
workloadToInstanceCostMap.put(queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+                LOGGER.info("Found workload cost for instance: {} workload: {} 
cost: {}",
+                    instanceName, queryWorkloadConfig.getQueryWorkloadName(), 
instanceCost);
+              }
+              // There should be only one matching nodeConfig (BROKER_NODE or 
SERVER_NODE) within a workload
+              break;
             }
-            break;
+          } catch (Exception e) {
+            LOGGER.error("Failed to compute instance cost for instance: {} 
workload: {}",
+                instanceName, queryWorkloadConfig.getQueryWorkloadName(), e);
+            // Continue with other workloads instead of failing completely
           }
         }
       }
+      LOGGER.info("Computed {} workload costs for instance: {}", 
workloadToInstanceCostMap.size(), instanceName);
       return workloadToInstanceCostMap;
     } catch (Exception e) {
-      String errorMsg = String.format("Failed to get workload to instance cost 
map for instance: %s", instanceName);
+      String errorMsg = String.format("Failed to compute workload costs for 
instance: %s", instanceName);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
   }
 
   private Set<String> resolveInstances(NodeConfig nodeConfig) {
     PropagationScheme propagationScheme =
-            
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
+        
_propagationSchemeProvider.getPropagationScheme(nodeConfig.getPropagationScheme().getPropagationType());
     return propagationScheme.resolveInstances(nodeConfig);
   }
+
+  private void checkAndDistributeEmptyCostSplitsEvenly(NodeConfig nodeConfig) {
+    // Check for empty cost splits
+    List<CostSplit> costSplits = 
nodeConfig.getPropagationScheme().getCostSplits();
+    List<CostSplit> emptySplits = new ArrayList<>();
+    List<CostSplit> nonEmptySplits = new ArrayList<>();

Review Comment:
   Is this for handling the case where some costSplits are empty and some are 
not? 
   
   This unnecessarily complicates logic we have to maintain. Just update the 
costsplit validation to check that: 
   "Either all costSplits should be empty or all should be non-empty" 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {
+      errors.add(prefix + " cannot be empty");
+      return;
+    }
+
+    Set<String> costIds = new HashSet<>();
+    long totalCpuCost = 0;

Review Comment:
   We should also evaluate if totalCPUCost matches with the workload CPU Cost? 
Same for memory. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/PropagationScheme.java:
##########
@@ -33,4 +36,6 @@ public interface PropagationScheme {
    * @return The set of instances to propagate the workload
    */
   Set<String> resolveInstances(NodeConfig nodeConfig);
+
+  Map<String, InstanceCost> resolveInstanceCostMap(NodeConfig nodeConfig, 
CostSplitter costSplitter);

Review Comment:
   Please add comments to interface functions. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/TablePropagationScheme.java:
##########
@@ -19,60 +19,192 @@
 package org.apache.pinot.controller.workload.scheme;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.workload.splitter.CostSplitter;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.workload.CostSplit;
+import org.apache.pinot.spi.config.workload.InstanceCost;
 import org.apache.pinot.spi.config.workload.NodeConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 /**
- * TablePropagationScheme is used to resolve instances based on the {@link 
NodeConfig} and {@link NodeConfig.Type}
- * It resolves the instances based on the table names specified in the node 
configuration
+ * A {@code TablePropagationScheme} resolves Pinot instances based on table 
names in a node
+ * configuration.
+ *
+ * <p>
+ * This scheme looks up Helix tags for offline and realtime tables and maps 
them to
+ * instances, enabling workload propagation by table.
+ * </p>
  */
 public class TablePropagationScheme implements PropagationScheme {
 
-  private static PinotHelixResourceManager _pinotHelixResourceManager;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
 
   public TablePropagationScheme(PinotHelixResourceManager 
pinotHelixResourceManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  @Override
+  /**
+   * Resolves the union of all instances across all cost splits for the given 
node config.
+   *
+   * @param nodeConfig Node configuration containing propagation scheme and 
cost splits.
+   * @return A set of instance names that should receive workload messages.
+   * @throws IllegalArgumentException If no instances are found for a cost 
split.
+   */
   public Set<String> resolveInstances(NodeConfig nodeConfig) {
     Set<String> instances = new HashSet<>();
-    List<String> tableNames = nodeConfig.getPropagationScheme().getValues();
-    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags
-            = PropagationUtils.getTableToHelixTags(_pinotHelixResourceManager);
-    Map<String, Set<String>> helixTagToInstances
-            = 
PropagationUtils.getHelixTagToInstances(_pinotHelixResourceManager);
-    for (String tableName : tableNames) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-      List<String> tablesWithType = new ArrayList<>();
-      if (tableType == null) {
-        // Get both offline and realtime table names if type is not present.
-        
tablesWithType.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
-        
tablesWithType.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
-      } else {
-        tablesWithType.add(tableName);
+    Map<String, Map<NodeConfig.Type, Set<String>>> tableWithTypeToHelixTags =

Review Comment:
   If there is an exception while collecting tableToHelixTags how is it handled 
here? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/workload/scheme/DefaultPropagationScheme.java:
##########
@@ -48,4 +52,21 @@ public Set<String> resolveInstances(NodeConfig nodeConfig) {
     }
     return instances;
   }
+
+  @Override
+  public Map<String, InstanceCost> resolveInstanceCostMap(NodeConfig 
nodeConfig, CostSplitter costSplitter) {
+    Set<String> instances = resolveInstances(nodeConfig);
+    if (instances.isEmpty()) {
+      throw new IllegalArgumentException("No instances found for node config: 
" + nodeConfig);
+    }
+    InstanceCost instanceCost = new InstanceCost(
+        nodeConfig.getEnforcementProfile().getCpuCostNs() / instances.size(),

Review Comment:
   We should use the costSplitter here to split the cost? 



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryWorkloadConfigUtils.java:
##########
@@ -226,9 +229,212 @@ public static List<String> 
validateQueryWorkloadConfig(QueryWorkloadConfig confi
           if (propagationScheme.getPropagationType() == null) {
             errors.add(prefix + ".propagationScheme.type cannot be null");
           }
+          // Validate CostSplits
+          validateCostSplits(propagationScheme.getCostSplits(), prefix + 
".propagationScheme.costSplits", errors);
         }
       }
     }
     return errors;
   }
+
+  /**
+   * Validates a list of CostSplit objects and their nested sub-allocations.
+   *
+   * @param costSplits the list of CostSplit objects to validate
+   * @param prefix the prefix for error messages
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplits(List<CostSplit> costSplits, String 
prefix, List<String> errors) {
+    if (costSplits == null) {
+      errors.add(prefix + " cannot be null");
+      return;
+    }
+
+    if (costSplits.isEmpty()) {
+      errors.add(prefix + " cannot be empty");
+      return;
+    }
+
+    Set<String> costIds = new HashSet<>();
+    long totalCpuCost = 0;
+    long totalMemoryCost = 0;
+
+    for (int i = 0; i < costSplits.size(); i++) {
+      CostSplit costSplit = costSplits.get(i);
+      String costSplitPrefix = prefix + "[" + i + "]";
+
+      if (costSplit == null) {
+        errors.add(costSplitPrefix + " cannot be null");
+        continue;
+      }
+
+      // Validate costId
+      String costId = costSplit.getCostId();
+      if (costId == null || costId.trim().isEmpty()) {
+        errors.add(costSplitPrefix + ".costId cannot be null or empty");
+      } else {
+        // Check for duplicate costIds
+        if (costIds.contains(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' is 
duplicated");
+        } else {
+          costIds.add(costId);
+        }
+
+        // Validate costId format (basic validation)
+        if (!isValidCostId(costId)) {
+          errors.add(costSplitPrefix + ".costId '" + costId + "' contains 
invalid characters");
+        }
+      }
+      // Validate that either both costs are null or both are non-null when 
sub-allocations exist
+      boolean hasCpuCost = costSplit.getCpuCostNs() != null;
+      boolean hasMemoryCost = costSplit.getMemoryCostBytes() != null;
+      boolean hasSubAllocations = costSplit.getSubAllocations() != null;
+
+      if ((hasCpuCost != hasMemoryCost)) {
+        errors.add(costSplitPrefix + ".cpuCostNs and memoryCostBytes must 
either both be null or both be non-null");
+        if (hasSubAllocations) {
+          errors.add(costSplitPrefix + ".subAllocations must be null when 
costs are null");
+        }
+        continue;
+      }
+      // Validate CPU cost
+      Long cpuCostNs = costSplit.getCpuCostNs();
+      if (cpuCostNs != null) {
+        if (cpuCostNs <= 0) {
+          errors.add(costSplitPrefix + ".cpuCostNs should be greater than 0, 
got: " + cpuCostNs);
+        } else {
+          // Check for potential overflow when summing
+          if (totalCpuCost > Long.MAX_VALUE - cpuCostNs) {
+            errors.add(prefix + " total CPU cost would overflow");
+          } else {
+            totalCpuCost += cpuCostNs;
+          }
+        }
+      }
+
+      // Validate memory cost
+      Long memoryCostBytes = costSplit.getMemoryCostBytes();
+      if (memoryCostBytes != null) {
+        if (memoryCostBytes <= 0) {
+          errors.add(costSplitPrefix + ".memoryCostBytes should be greater 
than 0, got: " + memoryCostBytes);
+        } else {
+          // Check for potential overflow when summing
+          if (totalMemoryCost > Long.MAX_VALUE - memoryCostBytes) {
+            errors.add(prefix + " total memory cost would overflow");
+          } else {
+            totalMemoryCost += memoryCostBytes;
+          }
+        }
+      }
+
+      // Validate sub-allocations (recursive validation)
+      List<CostSplit> subAllocations = costSplit.getSubAllocations();
+      if (subAllocations != null && !subAllocations.isEmpty() && cpuCostNs != 
null && memoryCostBytes != null) {
+        validateCostSplitSubAllocations(subAllocations, costSplitPrefix + 
".subAllocations",
+                                      cpuCostNs, memoryCostBytes, errors);
+      }
+    }
+  }
+
+  /**
+   * Validates sub-allocations within a CostSplit to ensure they don't exceed 
parent limits.
+   *
+   * @param subAllocations the list of sub-allocation CostSplit objects
+   * @param prefix the prefix for error messages
+   * @param parentCpuCostNs the parent's CPU cost limit
+   * @param parentMemoryCostBytes the parent's memory cost limit
+   * @param errors the list to add validation errors to
+   */
+  private static void validateCostSplitSubAllocations(List<CostSplit> 
subAllocations, String prefix,
+                                                     long parentCpuCostNs, 
long parentMemoryCostBytes,
+                                                     List<String> errors) {
+    if (subAllocations.isEmpty()) {

Review Comment:
   Most of the conditions here are the same the costSplit validation above. Can 
they be generalized into a helper function? 



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java:
##########
@@ -178,6 +178,14 @@ public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
     return new BudgetStats(totalCpuBudget, totalMemoryBudget, 
totalCpuRemaining, totalMemRemaining);
   }
 
+  public BudgetStats getBudgetStats(String workload) {

Review Comment:
   Please add comments as to why this is needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to