This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c23a1e971dc Add GET /minions/status API endpoint (#17475)
c23a1e971dc is described below

commit c23a1e971dceafae424ccd1d6c5b829ecf3c8e78
Author: Krishna Koganti <[email protected]>
AuthorDate: Fri Jan 9 12:53:02 2026 -0800

    Add GET /minions/status API endpoint (#17475)
    
    * Add GET /minions/status API endpoint
    
    * Add support for includeTaskCounts filter
---
 .../pinot/controller/api/resources/Constants.java  |   1 +
 .../api/resources/MinionStatusResponse.java        | 125 ++++
 .../api/resources/PinotMinionRestletResource.java  |  95 +++
 .../core/minion/PinotHelixTaskResourceManager.java | 128 +++-
 .../api/PinotMinionRestletResourceTest.java        | 153 ++++
 .../PinotHelixResourceManagerMinionStatusTest.java | 812 +++++++++++++++++++++
 .../utils/builder/ControllerRequestURLBuilder.java |  16 +
 7 files changed, 1329 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 570572fca94..ef30166c5d8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -45,6 +45,7 @@ public class Constants {
   public static final String SCHEMA_TAG = "Schema";
   public static final String TENANT_TAG = "Tenant";
   public static final String BROKER_TAG = "Broker";
+  public static final String MINION_TAG = "Minion";
   public static final String SEGMENT_TAG = "Segment";
   public static final String TASK_TAG = "Task";
   public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
new file mode 100644
index 00000000000..88691e342b0
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Response object for the GET /minions/status endpoint.
+ * Provides status information for all minion instances including their task 
counts and drain state.
+ */
+public class MinionStatusResponse {
+  private int _currentMinionCount;
+  private List<MinionStatus> _minionStatus;
+
+  public MinionStatusResponse() {
+  }
+
+  public MinionStatusResponse(int currentMinionCount, List<MinionStatus> 
minionStatus) {
+    _currentMinionCount = currentMinionCount;
+    _minionStatus = minionStatus;
+  }
+
+  @JsonProperty("currentMinionCount")
+  public int getCurrentMinionCount() {
+    return _currentMinionCount;
+  }
+
+  public void setCurrentMinionCount(int currentMinionCount) {
+    _currentMinionCount = currentMinionCount;
+  }
+
+  @JsonProperty("minionStatus")
+  public List<MinionStatus> getMinionStatus() {
+    return _minionStatus;
+  }
+
+  public void setMinionStatus(List<MinionStatus> minionStatus) {
+    _minionStatus = minionStatus;
+  }
+
+  /**
+   * Status information for a single minion instance.
+   */
+  public static class MinionStatus {
+    private String _instanceId;
+    private String _host;
+    private int _port;
+    private int _runningTaskCount;
+    private String _status;
+
+    public MinionStatus() {
+    }
+
+    public MinionStatus(String instanceId, String host, int port, int 
runningTaskCount, String status) {
+      _instanceId = instanceId;
+      _host = host;
+      _port = port;
+      _runningTaskCount = runningTaskCount;
+      _status = status;
+    }
+
+    @JsonProperty("instanceId")
+    public String getInstanceId() {
+      return _instanceId;
+    }
+
+    public void setInstanceId(String instanceId) {
+      _instanceId = instanceId;
+    }
+
+    @JsonProperty("host")
+    public String getHost() {
+      return _host;
+    }
+
+    public void setHost(String host) {
+      _host = host;
+    }
+
+    @JsonProperty("port")
+    public int getPort() {
+      return _port;
+    }
+
+    public void setPort(int port) {
+      _port = port;
+    }
+
+    @JsonProperty("runningTaskCount")
+    public int getRunningTaskCount() {
+      return _runningTaskCount;
+    }
+
+    public void setRunningTaskCount(int runningTaskCount) {
+      _runningTaskCount = runningTaskCount;
+    }
+
+    @JsonProperty("status")
+    public String getStatus() {
+      return _status;
+    }
+
+    public void setStatus(String status) {
+      _status = status;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
new file mode 100644
index 00000000000..06dcd02b06e
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * REST API for minion-related operations
+ */
+@Api(tags = Constants.MINION_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY),
+    @Authorization(value = DATABASE)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+        key = SWAGGER_AUTHORIZATION_KEY,
+        description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
+    @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
+        description = "Database context passed through http header. If no 
context is provided 'default' database "
+            + "context will be considered.")}))
+@Path("/")
+public class PinotMinionRestletResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotMinionRestletResource.class);
+
+  @Inject
+  PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+  @GET
+  @Path("/minions/status")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_INSTANCE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status of all minion instances including task 
counts and drain state",
+      notes = "Returns status information for all minion instances registered 
in the cluster")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 400, message = "Bad request - invalid status 
filter"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public MinionStatusResponse getMinionStatus(
+      @ApiParam(value = "Filter by status (ONLINE, OFFLINE, or DRAINED)")
+      @QueryParam("status") String status,
+      @ApiParam(value = "Include running task counts for each minion")
+      @QueryParam("includeTaskCounts") @DefaultValue("false") boolean 
includeTaskCounts) {
+    try {
+      return _pinotHelixTaskResourceManager.getMinionStatus(status, 
includeTaskCounts);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to get minion 
status",
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 2da272c1c1b..4087c8fa9f4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -42,6 +42,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -56,6 +57,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.utils.DateTimeUtils;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.controller.api.resources.MinionStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.core.common.MinionConstants;
@@ -64,6 +66,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1299,7 +1302,7 @@ public class PinotHelixTaskResourceManager {
    * <p>E.g. TaskQueue_DummyTask -> DummyTask (from Helix JobQueue name)
    * <p>E.g. TaskQueue_DummyTask_Task_DummyTask_12345 -> DummyTask (from Helix 
Job name)
    *
-   * @param name Pinot task name, Helix JobQueue name or Helix Job name
+   * @param name Pinot task name, Helix JobQueue name, or Helix Job name
    * @return Task type
    */
   public static String getTaskType(String name) {
@@ -1340,6 +1343,129 @@ public class PinotHelixTaskResourceManager {
     return 
MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
   }
 
+  /**
+   * Gets the status of all minion instances, including their task counts and 
drain state.
+   *
+   * @param statusFilter Optional filter by status ("ONLINE", "OFFLINE", or 
"DRAINED"). If null, returns all minions.
+   * @param includeTaskCounts Whether to include running task counts For each 
minion. Default false.
+   * @return MinionStatusResponse containing status information for minion 
instances
+   */
+  public MinionStatusResponse getMinionStatus(String statusFilter, boolean 
includeTaskCounts) {
+    // Validate status filter
+    if (statusFilter != null && !statusFilter.isEmpty()) {
+      if (!"ONLINE".equalsIgnoreCase(statusFilter)
+          && !"OFFLINE".equalsIgnoreCase(statusFilter)
+          && !"DRAINED".equalsIgnoreCase(statusFilter)) {
+        throw new IllegalArgumentException("Invalid status filter. Must be 
'ONLINE', 'OFFLINE', or 'DRAINED'");
+      }
+    }
+
+    // Get all instances and filter for minions, then sort them
+    List<String> allInstances = _helixResourceManager.getAllInstances();
+    List<String> minionInstances = allInstances.stream()
+        .filter(InstanceTypeUtils::isMinion)
+        .sorted()
+        .collect(Collectors.toList());
+
+    // Get running task counts per minion only if requested (can be expensive)
+    Map<String, Integer> runningTaskCounts = new HashMap<>();
+    if (includeTaskCounts) {
+      try {
+        runningTaskCounts = getRunningTaskCountsPerMinion();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to get running task counts from task resource 
manager", e);
+        // Continue with an empty map - task counts will be 0
+      }
+    }
+
+    // Build status list for each minion
+    List<MinionStatusResponse.MinionStatus> minionStatusList = new 
ArrayList<>();
+    for (String minionInstanceId : minionInstances) {
+      InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+      if (instanceConfig == null) {
+        continue;
+      }
+
+      // Determine minion status: OFFLINE (disabled in Helix), DRAINED, or 
ONLINE
+      boolean isEnabled = instanceConfig.getInstanceEnabled();
+      List<String> tags = instanceConfig.getTags();
+      boolean isDrained = tags != null && 
tags.contains(Helix.DRAINED_MINION_INSTANCE);
+
+      String status;
+      if (!isEnabled) {
+        status = "OFFLINE";
+      } else if (isDrained) {
+        status = "DRAINED";
+      } else {
+        status = "ONLINE";
+      }
+
+      // Apply status filter if specified
+      if (statusFilter != null && !statusFilter.isEmpty() && 
!status.equalsIgnoreCase(statusFilter)) {
+        continue;
+      }
+
+      // Get host and port
+      String host = instanceConfig.getHostName();
+      int port = Integer.parseInt(instanceConfig.getPort());
+
+      // Get the running task count for this minion
+      int runningTaskCount = runningTaskCounts.getOrDefault(minionInstanceId, 
0);
+
+      MinionStatusResponse.MinionStatus minionStatus =
+          new MinionStatusResponse.MinionStatus(minionInstanceId, host, port, 
runningTaskCount, status);
+      minionStatusList.add(minionStatus);
+    }
+
+    return new MinionStatusResponse(minionStatusList.size(), minionStatusList);
+  }
+
+  /**
+   * Gets the count of running tasks for each minion instance.
+   * This method iterates through all task workflows and jobs to count tasks 
in RUNNING state
+   * assigned to each minion.
+   *
+   * @return Map of minion instance ID to running task count
+   */
+  public synchronized Map<String, Integer> getRunningTaskCountsPerMinion() {
+    Map<String, Integer> runningTaskCounts = new HashMap<>();
+
+    // Get all workflows (task queues)
+    Set<String> workflows = _taskDriver.getWorkflows().keySet();
+
+    for (String workflow : workflows) {
+      WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(workflow);
+      if (workflowContext == null) {
+        continue;
+      }
+
+      // Get all jobs in this workflow
+      Set<String> jobs = workflowContext.getJobStates().keySet();
+      for (String job : jobs) {
+        JobContext jobContext = _taskDriver.getJobContext(job);
+        if (jobContext == null) {
+          continue;
+        }
+
+        // Iterate through all partitions (subtasks) in this job
+        Set<Integer> partitions = jobContext.getPartitionSet();
+        for (Integer partition : partitions) {
+          TaskPartitionState state = jobContext.getPartitionState(partition);
+          // Count INIT and RUNNING tasks (INIT means assigned but not yet 
started)
+          if (state == TaskPartitionState.RUNNING || state == 
TaskPartitionState.INIT) {
+            String assignedParticipant = 
jobContext.getAssignedParticipant(partition);
+            if (assignedParticipant != null) {
+              runningTaskCounts.put(assignedParticipant,
+                  runningTaskCounts.getOrDefault(assignedParticipant, 0) + 1);
+            }
+          }
+        }
+      }
+    }
+
+    return runningTaskCounts;
+  }
+
   @JsonPropertyOrder({
       "taskState", "subtaskCount", "startTime", "executionStartTime", 
"finishTime", "triggeredBy",
       "subtaskInfos"
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
new file mode 100644
index 00000000000..21ddcfcea59
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotMinionRestletResourceTest extends ControllerTest {
+
+    private ControllerRequestURLBuilder _urlBuilder = null;
+
+    @BeforeClass
+    public void setUp()
+        throws Exception {
+      DEFAULT_INSTANCE.setupSharedStateAndValidate();
+      _urlBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
+    }
+
+  @Test
+  public void testMinionStatusEndpoint()
+      throws Exception {
+    // Create 5 minion instances
+    List<String> minionIds = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      String host = "minion-status-test-" + i + ".example.com";
+      int port = 9514;
+      Instance minionInstance = new Instance(host, port, InstanceType.MINION,
+          
Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE), 
null, 0, 0, 0, 0, false);
+      sendPostRequest(_urlBuilder.forInstanceCreate(), 
minionInstance.toJsonString());
+      minionIds.add("Minion_" + host + "_" + port);
+    }
+
+    // Test 1: Get all minions (no filter)
+    String allMinionsUrl = _urlBuilder.forMinionStatus(null, false);
+    String allMinionsResponse = sendGetRequest(allMinionsUrl);
+    JsonNode allMinionsJson = JsonUtils.stringToJsonNode(allMinionsResponse);
+
+    assertNotNull(allMinionsJson.get("currentMinionCount"));
+    // Should have at least our 5 minions (may have more from other tests)
+    assertTrue(allMinionsJson.get("currentMinionCount").asInt() >= 5);
+    assertNotNull(allMinionsJson.get("minionStatus"));
+    assertTrue(allMinionsJson.get("minionStatus").isArray());
+
+    // Verify the structure of minion status
+    for (JsonNode minionStatus : allMinionsJson.get("minionStatus")) {
+      assertNotNull(minionStatus.get("instanceId"));
+      assertNotNull(minionStatus.get("host"));
+      assertNotNull(minionStatus.get("port"));
+      assertNotNull(minionStatus.get("runningTaskCount"));
+      assertNotNull(minionStatus.get("status"));
+      assertEquals(minionStatus.get("status").asText(), "ONLINE");
+    }
+
+    // Test 2: Drain 2 minions
+    String minion0 = minionIds.get(0);
+    String minion2 = minionIds.get(2);
+    sendPutRequest(_urlBuilder.forInstanceState(minion0) + "?state=DRAIN", "");
+    sendPutRequest(_urlBuilder.forInstanceState(minion2) + "?state=DRAIN", "");
+
+    // Test 3: Get only DRAINED minions
+    String drainedMinionsUrl = _urlBuilder.forMinionStatus("DRAINED", false);
+    String drainedMinionsResponse = sendGetRequest(drainedMinionsUrl);
+    JsonNode drainedMinionsJson = 
JsonUtils.stringToJsonNode(drainedMinionsResponse);
+
+    assertTrue(drainedMinionsJson.get("currentMinionCount").asInt() >= 2);
+    for (JsonNode minionStatus : drainedMinionsJson.get("minionStatus")) {
+      assertEquals(minionStatus.get("status").asText(), "DRAINED");
+    }
+
+    // Test 4: Get only ONLINE minions
+    String onlineMinionsUrl = _urlBuilder.forMinionStatus("ONLINE", false);
+    String onlineMinionsResponse = sendGetRequest(onlineMinionsUrl);
+    JsonNode onlineMinionsJson = 
JsonUtils.stringToJsonNode(onlineMinionsResponse);
+
+    assertTrue(onlineMinionsJson.get("currentMinionCount").asInt() >= 3);
+    for (JsonNode minionStatus : onlineMinionsJson.get("minionStatus")) {
+      assertEquals(minionStatus.get("status").asText(), "ONLINE");
+    }
+
+    // Cleanup
+    for (String minionId : minionIds) {
+      sendDeleteRequest(_urlBuilder.forInstance(minionId));
+    }
+  }
+
+  @Test
+  public void testMinionStatusEndpointCaseInsensitive()
+      throws Exception {
+    // Create a minion and drain it
+    String host = "minion-case-test.example.com";
+    int port = 9514;
+    Instance minionInstance = new Instance(host, port, InstanceType.MINION,
+        
Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE), 
null, 0, 0, 0, 0, false);
+    sendPostRequest(_urlBuilder.forInstanceCreate(), 
minionInstance.toJsonString());
+    String minionId = "Minion_" + host + "_" + port;
+    sendPutRequest(_urlBuilder.forInstanceState(minionId) + "?state=DRAIN", 
"");
+
+    // Test case-insensitive status filter
+    String upperCaseUrl = _urlBuilder.forMinionStatus("DRAINED", false);
+    String lowerCaseUrl = _urlBuilder.forMinionStatus("drained", false);
+
+    String upperResponse = sendGetRequest(upperCaseUrl);
+    String lowerResponse = sendGetRequest(lowerCaseUrl);
+
+    JsonNode upperJson = JsonUtils.stringToJsonNode(upperResponse);
+    JsonNode lowerJson = JsonUtils.stringToJsonNode(lowerResponse);
+
+    // Both should return the same results
+    assertTrue(upperJson.get("currentMinionCount").asInt() >= 1);
+    assertTrue(lowerJson.get("currentMinionCount").asInt() >= 1);
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(minionId));
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    DEFAULT_INSTANCE.cleanup();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
new file mode 100644
index 00000000000..91274a9aa82
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
@@ -0,0 +1,812 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.pinot.controller.api.resources.MinionStatusResponse;
+import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for minion status query functionality in 
PinotHelixTaskResourceManager
+ */
+public class PinotHelixResourceManagerMinionStatusTest extends ControllerTest {
+
+  private PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    _pinotHelixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
+  }
+
+  @AfterMethod
+  public void cleanupAfterTest() {
+    // Clean up all minion instances after each test to ensure test isolation
+    try {
+      for (String instanceId : _helixResourceManager.getAllInstances()) {
+        if (instanceId.startsWith("Minion_")) {
+          try {
+            _helixResourceManager.dropInstance(instanceId);
+          } catch (Exception e) {
+            // Ignore errors during cleanup
+          }
+        }
+      }
+    } catch (Exception e) {
+      // Ignore errors getting all instances
+    }
+  }
+
+  /**
+   * Helper method to create a mock TaskDriver with workflows and jobs 
containing tasks assigned to minions
+   * @param taskAssignments Map of minion instance ID to number of RUNNING 
tasks
+   * @return Mocked TaskDriver
+   */
+  private TaskDriver createMockTaskDriverWithRunningTasks(Map<String, Integer> 
taskAssignments) {
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+    // Create a mock workflows - getWorkflows() returns a Map<String, 
WorkflowConfig>
+    Map<String, WorkflowConfig> workflows = new HashMap<>();
+    workflows.put("TestWorkflow1", mock(WorkflowConfig.class));
+    when(mockTaskDriver.getWorkflows()).thenReturn(workflows);
+
+    // Create workflow context with jobs
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    
when(mockTaskDriver.getWorkflowContext("TestWorkflow1")).thenReturn(workflowContext);
+
+    Map<String, TaskState> jobStates = new HashMap<>();
+    jobStates.put("TestJob1", TaskState.IN_PROGRESS);
+    when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+    // Create job context with tasks assigned to minions
+    JobContext jobContext = mock(JobContext.class);
+    when(mockTaskDriver.getJobContext("TestJob1")).thenReturn(jobContext);
+
+    Set<Integer> partitions = new HashSet<>();
+    int partitionId = 0;
+    for (Map.Entry<String, Integer> entry : taskAssignments.entrySet()) {
+      String minionId = entry.getKey();
+      int taskCount = entry.getValue();
+      for (int i = 0; i < taskCount; i++) {
+        partitions.add(partitionId);
+        
when(jobContext.getPartitionState(partitionId)).thenReturn(TaskPartitionState.RUNNING);
+        
when(jobContext.getAssignedParticipant(partitionId)).thenReturn(minionId);
+        partitionId++;
+      }
+    }
+    when(jobContext.getPartitionSet()).thenReturn(partitions);
+
+    return mockTaskDriver;
+  }
+
+  @Test
+  public void testGetMinionStatusWithNoMinions() {
+    // Test with no minions registered - no tasks should be running
+    TaskDriver mockTaskDriver = createMockTaskDriverWithRunningTasks(new 
HashMap<>());
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+    assertNotNull(response);
+    assertEquals(response.getCurrentMinionCount(), 0);
+    assertNotNull(response.getMinionStatus());
+    assertTrue(response.getMinionStatus().isEmpty());
+  }
+
+  @Test
+  public void testGetMinionStatusWithOnlineMinions() {
+    // Create 3 online minions
+    for (int i = 0; i < 3; i++) {
+      String minionHost = "minion-test-" + i + ".example.com";
+      int minionPort = 9514;
+      Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+          Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 
0, 0, 0, false);
+      PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+      assertTrue(addResponse.isSuccessful(), "Failed to add minion instance");
+    }
+
+    // Create a mock TaskDriver with running tasks: minion-0 has 2 tasks, 
minion-1 has 1 task, minion-2 has 0 tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_minion-test-0.example.com_9514", 2);
+    taskAssignments.put("Minion_minion-test-1.example.com_9514", 1);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get all minions
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+    assertNotNull(response);
+    assertEquals(response.getCurrentMinionCount(), 3);
+    assertEquals(response.getMinionStatus().size(), 3);
+
+    // Verify all are ONLINE and check task counts
+    for (MinionStatusResponse.MinionStatus status : 
response.getMinionStatus()) {
+      assertEquals(status.getStatus(), "ONLINE");
+      assertNotNull(status.getInstanceId());
+      assertNotNull(status.getHost());
+      assertTrue(status.getPort() > 0);
+
+      // Verify task counts match expected values
+      if 
(status.getInstanceId().equals("Minion_minion-test-0.example.com_9514")) {
+        assertEquals(status.getRunningTaskCount(), 2);
+      } else if 
(status.getInstanceId().equals("Minion_minion-test-1.example.com_9514")) {
+        assertEquals(status.getRunningTaskCount(), 1);
+      } else {
+        assertEquals(status.getRunningTaskCount(), 0);
+      }
+    }
+
+    // Cleanup
+    for (int i = 0; i < 3; i++) {
+      String minionInstanceId = "Minion_minion-test-" + i + ".example.com_" + 
(9514);
+      _helixResourceManager.dropInstance(minionInstanceId);
+    }
+  }
+
+  @Test
+  public void testGetMinionStatusWithDrainedMinions() {
+    // Create 5 minions
+    for (int i = 0; i < 5; i++) {
+      String minionHost = "minion-drain-test-" + i + ".example.com";
+      int minionPort = 9514;
+      Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+          Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 
0, 0, 0, false);
+      PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+      assertTrue(addResponse.isSuccessful());
+    }
+
+    // Drain minions 1 and 3
+    String minion1 = "Minion_minion-drain-test-1.example.com_9514";
+    String minion3 = "Minion_minion-drain-test-3.example.com_9514";
+    _helixResourceManager.drainMinionInstance(minion1);
+    _helixResourceManager.drainMinionInstance(minion3);
+
+    // Create a mock TaskDriver with running tasks: drained minions can still 
have running tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_minion-drain-test-0.example.com_9514", 3);
+    taskAssignments.put(minion1, 1); // Drained but still has 1 running task
+    taskAssignments.put("Minion_minion-drain-test-2.example.com_9514", 2);
+    taskAssignments.put(minion3, 2); // Drained but still has 2 running tasks
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get all minions
+    MinionStatusResponse allResponse = 
taskResourceManager.getMinionStatus(null, true);
+    assertEquals(allResponse.getCurrentMinionCount(), 5);
+    assertEquals(allResponse.getMinionStatus().size(), 5);
+
+    // Count online vs drained
+    long onlineCount = allResponse.getMinionStatus().stream()
+        .filter(m -> "ONLINE".equals(m.getStatus()))
+        .count();
+    long drainedCount = allResponse.getMinionStatus().stream()
+        .filter(m -> "DRAINED".equals(m.getStatus()))
+        .count();
+    assertEquals(onlineCount, 3);
+    assertEquals(drainedCount, 2);
+
+    // Get only ONLINE minions
+    MinionStatusResponse onlineResponse = 
taskResourceManager.getMinionStatus("ONLINE", true);
+    assertEquals(onlineResponse.getCurrentMinionCount(), 3);
+    assertEquals(onlineResponse.getMinionStatus().size(), 3);
+    for (MinionStatusResponse.MinionStatus status : 
onlineResponse.getMinionStatus()) {
+      assertEquals(status.getStatus(), "ONLINE");
+    }
+
+    // Get only DRAINED minions
+    MinionStatusResponse drainedResponse = 
taskResourceManager.getMinionStatus("DRAINED", true);
+    assertEquals(drainedResponse.getCurrentMinionCount(), 2);
+    assertEquals(drainedResponse.getMinionStatus().size(), 2);
+    for (MinionStatusResponse.MinionStatus status : 
drainedResponse.getMinionStatus()) {
+      assertEquals(status.getStatus(), "DRAINED");
+      assertTrue(status.getInstanceId().equals(minion1) || 
status.getInstanceId().equals(minion3));
+      // Verify drained minions can still have running tasks
+      if (status.getInstanceId().equals(minion1)) {
+        assertEquals(status.getRunningTaskCount(), 1);
+      } else if (status.getInstanceId().equals(minion3)) {
+        assertEquals(status.getRunningTaskCount(), 2);
+      }
+    }
+
+    // Cleanup
+    for (int i = 0; i < 5; i++) {
+      String minionInstanceId = "Minion_minion-drain-test-" + i + 
".example.com_" + 9514;
+      _helixResourceManager.dropInstance(minionInstanceId);
+    }
+  }
+
+  @Test
+  public void testGetMinionStatusWithFilter() {
+    // Create 6 minions, drain 3 of them
+    for (int i = 0; i < 6; i++) {
+      String minionHost = "minion-filter-" + i + ".example.com";
+      int minionPort = 9514;
+      Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+          Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 
0, 0, 0, false);
+      PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+      assertTrue(addResponse.isSuccessful());
+    }
+
+    // Drain minions 0, 2, 4
+    for (int i = 0; i < 6; i += 2) {
+      String minionInstanceId = "Minion_minion-filter-" + i + ".example.com_" 
+ (9514);
+      _helixResourceManager.drainMinionInstance(minionInstanceId);
+    }
+
+    // Create a mock TaskDriver with running tasks: drained minions have 
tasks, online minions have tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_minion-filter-0.example.com_9514", 2); // 
Drained
+    taskAssignments.put("Minion_minion-filter-1.example.com_9514", 3); // 
Online
+    taskAssignments.put("Minion_minion-filter-2.example.com_9514", 1); // 
Drained
+    taskAssignments.put("Minion_minion-filter-3.example.com_9514", 2); // 
Online
+    taskAssignments.put("Minion_minion-filter-4.example.com_9514", 1); // 
Drained
+    taskAssignments.put("Minion_minion-filter-5.example.com_9514", 1); // 
Online
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get drained minions
+    MinionStatusResponse response = 
taskResourceManager.getMinionStatus("DRAINED", true);
+    assertEquals(response.getCurrentMinionCount(), 3);
+    assertEquals(response.getMinionStatus().size(), 3);
+    for (MinionStatusResponse.MinionStatus status : 
response.getMinionStatus()) {
+      assertEquals(status.getStatus(), "DRAINED");
+    }
+
+    // Get online minions
+    MinionStatusResponse onlineResponse = 
taskResourceManager.getMinionStatus("ONLINE", true);
+    assertEquals(onlineResponse.getCurrentMinionCount(), 3);
+    assertEquals(onlineResponse.getMinionStatus().size(), 3);
+    for (MinionStatusResponse.MinionStatus status : 
onlineResponse.getMinionStatus()) {
+      assertEquals(status.getStatus(), "ONLINE");
+    }
+
+    // Cleanup
+    for (int i = 0; i < 6; i++) {
+      String minionInstanceId = "Minion_minion-filter-" + i + ".example.com_" 
+ (9514);
+      _helixResourceManager.dropInstance(minionInstanceId);
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetMinionStatusWithInvalidFilter() {
+    // Create a minion
+    String minionHost = "minion-invalid-filter.example.com";
+    int minionPort = 9514;
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    _helixResourceManager.addInstance(minionInstance, false);
+
+    // Create a mock TaskDriver with running tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_" + minionHost + "_" + minionPort, 1);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    try {
+      // Try with invalid status filter - should throw IllegalArgumentException
+      taskResourceManager.getMinionStatus("INVALID_STATUS", true);
+    } finally {
+      // Cleanup
+      String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+      _helixResourceManager.dropInstance(minionInstanceId);
+    }
+  }
+
+  @Test
+  public void testGetMinionStatusCaseInsensitiveFilter() {
+    // Create 2 minions, drain one
+    String minionHost1 = "minion-case-test-1.example.com";
+    String minionHost2 = "minion-case-test-2.example.com";
+    int minionPort = 9514;
+
+    Instance minion1 = new Instance(minionHost1, minionPort, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    Instance minion2 = new Instance(minionHost2, minionPort, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+
+    _helixResourceManager.addInstance(minion1, false);
+    _helixResourceManager.addInstance(minion2, false);
+
+    String minion1Id = "Minion_" + minionHost1 + "_" + minionPort;
+    String minion2Id = "Minion_" + minionHost2 + "_" + minionPort;
+    _helixResourceManager.drainMinionInstance(minion1Id);
+
+    // Create a mock TaskDriver with running tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(minion1Id, 2); // Drained minion with 2 tasks
+    taskAssignments.put(minion2Id, 3); // Online minion with 3 tasks
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Test case-insensitive filters
+    MinionStatusResponse responseUpperCase = 
taskResourceManager.getMinionStatus("DRAINED", true);
+    MinionStatusResponse responseLowerCase = 
taskResourceManager.getMinionStatus("drained", true);
+    MinionStatusResponse responseMixedCase = 
taskResourceManager.getMinionStatus("Drained", true);
+
+    assertEquals(responseUpperCase.getCurrentMinionCount(), 1);
+    assertEquals(responseLowerCase.getCurrentMinionCount(), 1);
+    assertEquals(responseMixedCase.getCurrentMinionCount(), 1);
+
+    // Verify task counts in all responses
+    
assertEquals(responseUpperCase.getMinionStatus().get(0).getRunningTaskCount(), 
2);
+    
assertEquals(responseLowerCase.getMinionStatus().get(0).getRunningTaskCount(), 
2);
+    
assertEquals(responseMixedCase.getMinionStatus().get(0).getRunningTaskCount(), 
2);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minion1Id);
+    _helixResourceManager.dropInstance(minion2Id);
+  }
+
+  @Test
+  public void testGetMinionStatusOnlyReturnsMinions() {
+    // Create different instance types
+    String brokerHost = "broker-test.example.com";
+    String serverHost = "server-test.example.com";
+    String minionHost = "minion-type-test.example.com";
+
+    Instance brokerInstance = new Instance(brokerHost, 8099, 
InstanceType.BROKER,
+        Collections.singletonList("DefaultTenant_BROKER"), null, 0, 0, 0, 0, 
false);
+    Instance serverInstance = new Instance(serverHost, 8098, 
InstanceType.SERVER,
+        Collections.singletonList("DefaultTenant_OFFLINE"), null, 0, 0, 0, 0, 
false);
+    Instance minionInstance = new Instance(minionHost, 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+
+    _helixResourceManager.addInstance(brokerInstance, false);
+    _helixResourceManager.addInstance(serverInstance, false);
+    _helixResourceManager.addInstance(minionInstance, false);
+
+    // Create a mock TaskDriver with running tasks - only for minion (broker 
and server won't have task counts)
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_" + minionHost + "_9514", 5);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get minion status - should only return the minion instance
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+    assertEquals(response.getCurrentMinionCount(), 1);
+    assertEquals(response.getMinionStatus().size(), 1);
+    assertEquals(response.getMinionStatus().get(0).getHost(), minionHost);
+    assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 5);
+
+    // Cleanup
+    _helixResourceManager.dropInstance("Broker_" + brokerHost + "_8099");
+    _helixResourceManager.dropInstance("Server_" + serverHost + "_8098");
+    _helixResourceManager.dropInstance("Minion_" + minionHost + "_9514");
+  }
+
+  @Test
+  public void testGetMinionStatusReturnsCorrectPortAndHost() {
+    // Create minion with specific host and port
+    String expectedHost = "specific-minion-host.example.com";
+    int expectedPort = 9514;
+
+    Instance minionInstance = new Instance(expectedHost, expectedPort, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    _helixResourceManager.addInstance(minionInstance, false);
+
+    // Create a mock TaskDriver with running tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put("Minion_" + expectedHost + "_" + expectedPort, 4);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get status and verify host/port
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+    assertEquals(response.getCurrentMinionCount(), 1);
+
+    MinionStatusResponse.MinionStatus status = 
response.getMinionStatus().get(0);
+    assertEquals(status.getHost(), expectedHost);
+    assertEquals(status.getPort(), expectedPort);
+    assertEquals(status.getInstanceId(), "Minion_" + expectedHost + "_" + 
expectedPort);
+    assertEquals(status.getRunningTaskCount(), 4);
+
+    // Cleanup
+    _helixResourceManager.dropInstance("Minion_" + expectedHost + "_" + 
expectedPort);
+  }
+
+  @Test
+  public void testGetMinionStatusWithRunningTasks() {
+    // Create 3 minions
+    String minion1Id = "Minion_minion-running-1.example.com_9514";
+    String minion2Id = "Minion_minion-running-2.example.com_9514";
+    String minion3Id = "Minion_minion-running-3.example.com_9514";
+
+    Instance minion1 = new Instance("minion-running-1.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    Instance minion2 = new Instance("minion-running-2.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    Instance minion3 = new Instance("minion-running-3.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+
+    _helixResourceManager.addInstance(minion1, false);
+    _helixResourceManager.addInstance(minion2, false);
+    _helixResourceManager.addInstance(minion3, false);
+
+    // Create a mock TaskDriver with running tasks: minion1 has 3 tasks, 
minion2 has 1 task, minion3 has 0 tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(minion1Id, 3);
+    taskAssignments.put(minion2Id, 1);
+    // minion3 not in map = 0 running tasks
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Call getMinionStatus on the task resource manager
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+
+    assertNotNull(response);
+    assertEquals(response.getCurrentMinionCount(), 3);
+    assertEquals(response.getMinionStatus().size(), 3);
+
+    // Verify running task counts for each minion
+    for (MinionStatusResponse.MinionStatus status : 
response.getMinionStatus()) {
+      if (status.getInstanceId().equals(minion1Id)) {
+        assertEquals(status.getRunningTaskCount(), 3);
+      } else if (status.getInstanceId().equals(minion2Id)) {
+        assertEquals(status.getRunningTaskCount(), 1);
+      } else if (status.getInstanceId().equals(minion3Id)) {
+        assertEquals(status.getRunningTaskCount(), 0);
+      }
+    }
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minion1Id);
+    _helixResourceManager.dropInstance(minion2Id);
+    _helixResourceManager.dropInstance(minion3Id);
+  }
+
+  @Test
+  public void testGetMinionStatusWithRunningTasksAndInitTasks() {
+    // This test verifies that getMinionStatus correctly reports running task 
counts
+    // including both RUNNING and INIT states (tasks assigned but not yet 
started)
+
+    // Create 2 minions
+    String minion1Id = "Minion_minion-init-test-1.example.com_9514";
+    String minion2Id = "Minion_minion-init-test-2.example.com_9514";
+
+    Instance minion1 = new Instance("minion-init-test-1.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    Instance minion2 = new Instance("minion-init-test-2.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+
+    _helixResourceManager.addInstance(minion1, false);
+    _helixResourceManager.addInstance(minion2, false);
+
+    // Create a mock TaskDriver with running tasks
+    // Simulate: minion1 has 3 RUNNING tasks, minion2 has 1 RUNNING task
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(minion1Id, 3); // 3 RUNNING tasks
+    taskAssignments.put(minion2Id, 1); // 1 RUNNING task
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Call getMinionStatus
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+
+    assertNotNull(response);
+    assertEquals(response.getCurrentMinionCount(), 2);
+
+    // Verify that the running task counts include both RUNNING and INIT tasks
+    Map<String, Integer> resultCounts = new HashMap<>();
+    for (MinionStatusResponse.MinionStatus status : 
response.getMinionStatus()) {
+      resultCounts.put(status.getInstanceId(), status.getRunningTaskCount());
+    }
+
+    assertEquals(resultCounts.get(minion1Id).intValue(), 3);
+    assertEquals(resultCounts.get(minion2Id).intValue(), 1);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minion1Id);
+    _helixResourceManager.dropInstance(minion2Id);
+  }
+
+  @Test
+  public void testGetMinionStatusWithDrainedMinionAndRunningTasks() {
+    // Test that drained minions can still show running task counts
+    // (tasks that were assigned before draining)
+
+    String minionId = "Minion_minion-drained-running.example.com_9514";
+    Instance minion = new Instance("minion-drained-running.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    _helixResourceManager.addInstance(minion, false);
+
+    // Drain the minion
+    _helixResourceManager.drainMinionInstance(minionId);
+
+    // Create a mock TaskDriver with running tasks
+    // Even though drained, minion might still have running tasks that were 
assigned before draining
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(minionId, 2);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get all minions
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+    assertEquals(response.getCurrentMinionCount(), 1);
+
+    MinionStatusResponse.MinionStatus status = 
response.getMinionStatus().get(0);
+    assertEquals(status.getInstanceId(), minionId);
+    assertEquals(status.getStatus(), "DRAINED");
+    assertEquals(status.getRunningTaskCount(), 2); // Should still show 
running tasks
+
+    // Get only DRAINED minions
+    MinionStatusResponse drainedResponse = 
taskResourceManager.getMinionStatus("DRAINED", true);
+    assertEquals(drainedResponse.getCurrentMinionCount(), 1);
+    
assertEquals(drainedResponse.getMinionStatus().get(0).getRunningTaskCount(), 2);
+
+    // Get only ONLINE minions
+    MinionStatusResponse onlineResponse = 
taskResourceManager.getMinionStatus("ONLINE", true);
+    assertEquals(onlineResponse.getCurrentMinionCount(), 0);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionId);
+  }
+
+  @Test
+  public void testGetMinionStatusRunningTaskCountException() {
+    // Test that getMinionStatus handles exceptions from 
getRunningTaskCountsPerMinion gracefully
+
+    String minionId = "Minion_minion-exception-test.example.com_9514";
+    Instance minion = new Instance("minion-exception-test.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    _helixResourceManager.addInstance(minion, false);
+
+    // Create a mock TaskDriver that throws an exception when accessing 
workflows
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    Mockito.doThrow(new RuntimeException("Simulated task driver failure"))
+        .when(mockTaskDriver).getWorkflows();
+
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // getMinionStatus should still work, just with 0 running task counts
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
true);
+
+    assertNotNull(response);
+    assertEquals(response.getCurrentMinionCount(), 1);
+    assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionId);
+  }
+
+  @Test
+  public void testGetMinionStatusWithOfflineMinions() {
+    // Test OFFLINE status (Helix disabled minions)
+    // Create 4 minions: 2 ONLINE, 1 OFFLINE, 1 DRAINED
+    String onlineMinion1 = "Minion_minion-offline-test-0.example.com_9514";
+    String offlineMinion = "Minion_minion-offline-test-1.example.com_9514";
+    String onlineMinion2 = "Minion_minion-offline-test-2.example.com_9514";
+    String drainedMinion = "Minion_minion-offline-test-3.example.com_9514";
+
+    for (int i = 0; i < 4; i++) {
+      Instance minion = new Instance("minion-offline-test-" + i + 
".example.com", 9514, InstanceType.MINION,
+          Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 
0, 0, 0, false);
+      _helixResourceManager.addInstance(minion, false);
+    }
+
+    // Disable one minion in Helix (OFFLINE)
+    _helixResourceManager.disableInstance(offlineMinion);
+
+    // Drain one minion (DRAINED)
+    _helixResourceManager.drainMinionInstance(drainedMinion);
+
+    // Create a mock TaskDriver with running tasks
+    // Note: OFFLINE minions cannot have running tasks since they are disabled 
in Helix
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(onlineMinion1, 2);
+    taskAssignments.put(onlineMinion2, 3);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Get all minions
+    MinionStatusResponse allResponse = 
taskResourceManager.getMinionStatus(null, true);
+    assertEquals(allResponse.getCurrentMinionCount(), 4);
+    assertEquals(allResponse.getMinionStatus().size(), 4);
+
+    // Count statuses
+    Map<String, Long> statusCounts = allResponse.getMinionStatus().stream()
+        
.collect(Collectors.groupingBy(MinionStatusResponse.MinionStatus::getStatus, 
Collectors.counting()));
+    assertEquals(statusCounts.get("ONLINE").longValue(), 2);
+    assertEquals(statusCounts.get("OFFLINE").longValue(), 1);
+    assertEquals(statusCounts.get("DRAINED").longValue(), 1);
+
+    // Get only OFFLINE minions
+    MinionStatusResponse offlineResponse = 
taskResourceManager.getMinionStatus("OFFLINE", true);
+    assertEquals(offlineResponse.getCurrentMinionCount(), 1);
+    assertEquals(offlineResponse.getMinionStatus().size(), 1);
+    assertEquals(offlineResponse.getMinionStatus().get(0).getInstanceId(), 
offlineMinion);
+    assertEquals(offlineResponse.getMinionStatus().get(0).getStatus(), 
"OFFLINE");
+    
assertEquals(offlineResponse.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+    // Cleanup
+    for (int i = 0; i < 4; i++) {
+      _helixResourceManager.dropInstance("Minion_minion-offline-test-" + i + 
".example.com_9514");
+    }
+  }
+
+  @Test
+  public void testGetMinionStatusWithoutTaskCounts() {
+    // Test that includeTaskCounts=false doesn't query task counts (all should 
be 0)
+    String minion1Id = "Minion_minion-no-tasks-1.example.com_9514";
+    String minion2Id = "Minion_minion-no-tasks-2.example.com_9514";
+
+    Instance minion1 = new Instance("minion-no-tasks-1.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    Instance minion2 = new Instance("minion-no-tasks-2.example.com", 9514, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+
+    _helixResourceManager.addInstance(minion1, false);
+    _helixResourceManager.addInstance(minion2, false);
+
+    // Create a mock TaskDriver with running tasks
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(minion1Id, 5);
+    taskAssignments.put(minion2Id, 3);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Call with includeTaskCounts=false - all task counts should be 0
+    MinionStatusResponse response = taskResourceManager.getMinionStatus(null, 
false);
+
+    assertEquals(response.getCurrentMinionCount(), 2);
+    assertEquals(response.getMinionStatus().size(), 2);
+
+    // Verify all task counts are 0 (not queried)
+    for (MinionStatusResponse.MinionStatus status : 
response.getMinionStatus()) {
+      assertEquals(status.getRunningTaskCount(), 0);
+    }
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minion1Id);
+    _helixResourceManager.dropInstance(minion2Id);
+  }
+
+  @Test
+  public void testGetMinionStatusMixedStates() {
+    // Test with a mix of ONLINE, OFFLINE, and DRAINED minions
+    String onlineMinion = "Minion_minion-mixed-0.example.com_9514";
+    String offlineMinion = "Minion_minion-mixed-1.example.com_9514";
+    String drainedMinion = "Minion_minion-mixed-2.example.com_9514";
+    String onlineMinion2 = "Minion_minion-mixed-3.example.com_9514";
+
+    for (int i = 0; i < 4; i++) {
+      Instance minion = new Instance("minion-mixed-" + i + ".example.com", 
9514, InstanceType.MINION,
+          Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 
0, 0, 0, false);
+      _helixResourceManager.addInstance(minion, false);
+    }
+
+    // Set different states
+    _helixResourceManager.disableInstance(offlineMinion); // OFFLINE
+    _helixResourceManager.drainMinionInstance(drainedMinion); // DRAINED
+
+    // OFFLINE minions cannot have running tasks since they are disabled in 
Helix
+    Map<String, Integer> taskAssignments = new HashMap<>();
+    taskAssignments.put(onlineMinion, 1);
+    taskAssignments.put(drainedMinion, 3);
+    taskAssignments.put(onlineMinion2, 4);
+
+    TaskDriver mockTaskDriver = 
createMockTaskDriverWithRunningTasks(taskAssignments);
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    // Test filtering by each status
+    MinionStatusResponse onlineResponse = 
taskResourceManager.getMinionStatus("ONLINE", true);
+    assertEquals(onlineResponse.getCurrentMinionCount(), 2);
+    for (MinionStatusResponse.MinionStatus status : 
onlineResponse.getMinionStatus()) {
+      assertEquals(status.getStatus(), "ONLINE");
+      assertTrue(status.getRunningTaskCount() > 0);
+    }
+
+    MinionStatusResponse offlineResponse = 
taskResourceManager.getMinionStatus("OFFLINE", true);
+    assertEquals(offlineResponse.getCurrentMinionCount(), 1);
+    assertEquals(offlineResponse.getMinionStatus().get(0).getStatus(), 
"OFFLINE");
+    assertEquals(offlineResponse.getMinionStatus().get(0).getInstanceId(), 
offlineMinion);
+    
assertEquals(offlineResponse.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+    MinionStatusResponse drainedResponse = 
taskResourceManager.getMinionStatus("DRAINED", true);
+    assertEquals(drainedResponse.getCurrentMinionCount(), 1);
+    assertEquals(drainedResponse.getMinionStatus().get(0).getStatus(), 
"DRAINED");
+    assertEquals(drainedResponse.getMinionStatus().get(0).getInstanceId(), 
drainedMinion);
+
+    // Cleanup
+    for (int i = 0; i < 4; i++) {
+      _helixResourceManager.dropInstance("Minion_minion-mixed-" + i + 
".example.com_9514");
+    }
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testGetMinionStatusWithInvalidOfflineFilter() {
+    // Test that invalid status filter throws exception (test with various 
invalid values)
+    String minionHost = "minion-invalid-offline.example.com";
+    int minionPort = 9514;
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 
0, 0, false);
+    _helixResourceManager.addInstance(minionInstance, false);
+
+    TaskDriver mockTaskDriver = createMockTaskDriverWithRunningTasks(new 
HashMap<>());
+    PinotHelixTaskResourceManager taskResourceManager = new 
PinotHelixTaskResourceManager(
+        _helixResourceManager, mockTaskDriver);
+
+    try {
+      // Try with invalid status filter - should throw IllegalArgumentException
+      taskResourceManager.getMinionStatus("DISABLED", false);
+    } finally {
+      // Cleanup
+      _helixResourceManager.dropInstance("Minion_" + minionHost + "_" + 
minionPort);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopController();
+    stopZk();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2da3d879d0a..12fa14bcc82 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.utils.builder;
 
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -125,6 +126,21 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "tasks", "task", taskName);
   }
 
+  public String forMinionStatus(@Nullable String statusFilter, boolean 
includeTaskCounts) {
+    StringBuilder url = new StringBuilder(StringUtil.join("/", _baseUrl, 
"minions", "status"));
+    List<String> params = new ArrayList<>();
+    if (statusFilter != null && !statusFilter.isEmpty()) {
+      params.add("status=" + statusFilter);
+    }
+    if (includeTaskCounts) {
+      params.add("includeTaskCounts=true");
+    }
+    if (!params.isEmpty()) {
+      url.append("?").append(String.join("&", params));
+    }
+    return url.toString();
+  }
+
   public String forStopMinionTaskQueue(String taskType) {
     return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to