kfaraz commented on code in PR #18634:
URL: https://github.com/apache/druid/pull/18634#discussion_r2464343154


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java:
##########
@@ -105,12 +109,31 @@ public String getDefaultCategory()
       return defaultCategory;
     }
 
+    /**
+     * Returns a map of datasource names to worker category names.

Review Comment:
   ```suggestion
        * Returns a map from datasource name to the worker category name to be 
used for tasks of that datasource.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java:
##########
@@ -105,12 +109,31 @@ public String getDefaultCategory()
       return defaultCategory;
     }
 
+    /**
+     * Returns a map of datasource names to worker category names.
+     * Used to assign tasks to specific worker categories based on their 
datasource.
+     *
+     * @return map where key is datasource name and value is worker category 
name
+     */
     @JsonProperty
     public Map<String, String> getCategoryAffinity()
     {
       return categoryAffinity;
     }
 
+    /**
+     * Returns a map of supervisor IDs to worker category names.
+     * Used to assign tasks to specific worker categories based on their 
supervisor ID.
+     * This takes precedence over {@link #getCategoryAffinity()} when both are 
configured.
+     *
+     * @return map where key is supervisor ID and value is worker category name

Review Comment:
   Nit: can be omitted.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java:
##########
@@ -105,12 +109,31 @@ public String getDefaultCategory()
       return defaultCategory;
     }
 
+    /**
+     * Returns a map of datasource names to worker category names.
+     * Used to assign tasks to specific worker categories based on their 
datasource.
+     *
+     * @return map where key is datasource name and value is worker category 
name
+     */
     @JsonProperty
     public Map<String, String> getCategoryAffinity()
     {
       return categoryAffinity;
     }
 
+    /**
+     * Returns a map of supervisor IDs to worker category names.

Review Comment:
   ```suggestion
        * Returns a map from supervisor ID to worker category name to be used 
for tasks of that supervisor.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java:
##########
@@ -120,10 +121,25 @@ public static ImmutableWorkerInfo selectWorker(
       if (categoryConfig != null) {
         final String defaultCategory = categoryConfig.getDefaultCategory();
         final Map<String, String> categoryAffinity = 
categoryConfig.getCategoryAffinity();
-
-        String preferredCategory = categoryAffinity.get(task.getDataSource());
-        // If there is no preferred category for the datasource, then using 
the defaultCategory. However, the defaultCategory
-        // may be null too, so we need to do one more null check (see below).
+        final Map<String, String> supervisorIdCategoryAffinity = 
categoryConfig.getSupervisorIdCategoryAffinity();
+
+        String preferredCategory = null;
+        
+        // First, check if this task has a supervisorId and if there's a 
category affinity for it
+        if (task instanceof SeekableStreamIndexTask) {
+          final String supervisorId = ((SeekableStreamIndexTask<?, ?, ?>) 
task).getSupervisorId();
+          if (supervisorId != null) {
+            preferredCategory = supervisorIdCategoryAffinity.get(supervisorId);
+          }
+        }
+        
+        // If no supervisor-based category is found, fall back to 
datasource-based category affinity
+        if (preferredCategory == null) {
+          preferredCategory = categoryAffinity.get(task.getDataSource());
+        }
+        
+        // If there is no preferred category for the supervisorId or 
datasource, then using the defaultCategory.

Review Comment:
   ```suggestion
           // If there is no preferred category for the supervisorId or 
datasource, then use the defaultCategory.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java:
##########
@@ -105,12 +109,31 @@ public String getDefaultCategory()
       return defaultCategory;
     }
 
+    /**
+     * Returns a map of datasource names to worker category names.
+     * Used to assign tasks to specific worker categories based on their 
datasource.
+     *
+     * @return map where key is datasource name and value is worker category 
name

Review Comment:
   Nit: Not needed since the first line already covers this info.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java:
##########
@@ -187,4 +293,27 @@ private ImmutableWorkerInfo 
selectWorker(WorkerCategorySpec workerCategorySpec)
 
     return worker;
   }
+
+  /**
+   * Helper method to create a test task with supervisor ID for testing
+   */
+  @SuppressWarnings("unchecked")
+  private static Task createTestTask(String id, @Nullable String supervisorId, 
String datasource)
+  {
+    return new TestSeekableStreamIndexTask(
+        id,
+        supervisorId,
+        null,
+        DataSchema.builder()
+            .withDataSource(datasource)
+            .withTimestamp(new TimestampSpec(null, null, null))
+            .withDimensions(new DimensionsSpec(Collections.emptyList()))
+            .withGranularity(new ArbitraryGranularitySpec(new 
AllGranularity(), Collections.emptyList()))

Review Comment:
   Nit: formatting seems off.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to