This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c23a1e971dc Add GET /minions/status API endpoint (#17475)
c23a1e971dc is described below
commit c23a1e971dceafae424ccd1d6c5b829ecf3c8e78
Author: Krishna Koganti <[email protected]>
AuthorDate: Fri Jan 9 12:53:02 2026 -0800
Add GET /minions/status API endpoint (#17475)
* Add GET /minions/status API endpoint
* Add support for includeTaskCounts filter
---
.../pinot/controller/api/resources/Constants.java | 1 +
.../api/resources/MinionStatusResponse.java | 125 ++++
.../api/resources/PinotMinionRestletResource.java | 95 +++
.../core/minion/PinotHelixTaskResourceManager.java | 128 +++-
.../api/PinotMinionRestletResourceTest.java | 153 ++++
.../PinotHelixResourceManagerMinionStatusTest.java | 812 +++++++++++++++++++++
.../utils/builder/ControllerRequestURLBuilder.java | 16 +
7 files changed, 1329 insertions(+), 1 deletion(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 570572fca94..ef30166c5d8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -45,6 +45,7 @@ public class Constants {
public static final String SCHEMA_TAG = "Schema";
public static final String TENANT_TAG = "Tenant";
public static final String BROKER_TAG = "Broker";
+ public static final String MINION_TAG = "Minion";
public static final String SEGMENT_TAG = "Segment";
public static final String TASK_TAG = "Task";
public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
new file mode 100644
index 00000000000..88691e342b0
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/MinionStatusResponse.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Response object for the GET /minions/status endpoint.
+ * Provides status information for all minion instances including their task
counts and drain state.
+ */
+public class MinionStatusResponse {
+ private int _currentMinionCount;
+ private List<MinionStatus> _minionStatus;
+
+ public MinionStatusResponse() {
+ }
+
+ public MinionStatusResponse(int currentMinionCount, List<MinionStatus>
minionStatus) {
+ _currentMinionCount = currentMinionCount;
+ _minionStatus = minionStatus;
+ }
+
+ @JsonProperty("currentMinionCount")
+ public int getCurrentMinionCount() {
+ return _currentMinionCount;
+ }
+
+ public void setCurrentMinionCount(int currentMinionCount) {
+ _currentMinionCount = currentMinionCount;
+ }
+
+ @JsonProperty("minionStatus")
+ public List<MinionStatus> getMinionStatus() {
+ return _minionStatus;
+ }
+
+ public void setMinionStatus(List<MinionStatus> minionStatus) {
+ _minionStatus = minionStatus;
+ }
+
+ /**
+ * Status information for a single minion instance.
+ */
+ public static class MinionStatus {
+ private String _instanceId;
+ private String _host;
+ private int _port;
+ private int _runningTaskCount;
+ private String _status;
+
+ public MinionStatus() {
+ }
+
+ public MinionStatus(String instanceId, String host, int port, int
runningTaskCount, String status) {
+ _instanceId = instanceId;
+ _host = host;
+ _port = port;
+ _runningTaskCount = runningTaskCount;
+ _status = status;
+ }
+
+ @JsonProperty("instanceId")
+ public String getInstanceId() {
+ return _instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ _instanceId = instanceId;
+ }
+
+ @JsonProperty("host")
+ public String getHost() {
+ return _host;
+ }
+
+ public void setHost(String host) {
+ _host = host;
+ }
+
+ @JsonProperty("port")
+ public int getPort() {
+ return _port;
+ }
+
+ public void setPort(int port) {
+ _port = port;
+ }
+
+ @JsonProperty("runningTaskCount")
+ public int getRunningTaskCount() {
+ return _runningTaskCount;
+ }
+
+ public void setRunningTaskCount(int runningTaskCount) {
+ _runningTaskCount = runningTaskCount;
+ }
+
+ @JsonProperty("status")
+ public String getStatus() {
+ return _status;
+ }
+
+ public void setStatus(String status) {
+ _status = status;
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
new file mode 100644
index 00000000000..06dcd02b06e
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotMinionRestletResource.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * REST API for minion-related operations
+ */
+@Api(tags = Constants.MINION_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY),
+ @Authorization(value = DATABASE)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
+ @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+ key = SWAGGER_AUTHORIZATION_KEY,
+ description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
+ @ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
+ description = "Database context passed through http header. If no
context is provided 'default' database "
+ + "context will be considered.")}))
+@Path("/")
+public class PinotMinionRestletResource {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotMinionRestletResource.class);
+
+ @Inject
+ PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+ @GET
+ @Path("/minions/status")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_INSTANCE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get status of all minion instances including task
counts and drain state",
+ notes = "Returns status information for all minion instances registered
in the cluster")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 400, message = "Bad request - invalid status
filter"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
+ public MinionStatusResponse getMinionStatus(
+ @ApiParam(value = "Filter by status (ONLINE, OFFLINE, or DRAINED)")
+ @QueryParam("status") String status,
+ @ApiParam(value = "Include running task counts for each minion")
+ @QueryParam("includeTaskCounts") @DefaultValue("false") boolean
includeTaskCounts) {
+ try {
+ return _pinotHelixTaskResourceManager.getMinionStatus(status,
includeTaskCounts);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to get minion
status",
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 2da272c1c1b..4087c8fa9f4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -42,6 +42,7 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -56,6 +57,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.DateTimeUtils;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.controller.api.resources.MinionStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.common.MinionConstants;
@@ -64,6 +66,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1299,7 +1302,7 @@ public class PinotHelixTaskResourceManager {
* <p>E.g. TaskQueue_DummyTask -> DummyTask (from Helix JobQueue name)
* <p>E.g. TaskQueue_DummyTask_Task_DummyTask_12345 -> DummyTask (from Helix
Job name)
*
- * @param name Pinot task name, Helix JobQueue name or Helix Job name
+ * @param name Pinot task name, Helix JobQueue name, or Helix Job name
* @return Task type
*/
public static String getTaskType(String name) {
@@ -1340,6 +1343,129 @@ public class PinotHelixTaskResourceManager {
return
MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
}
+ /**
+ * Gets the status of all minion instances, including their task counts and
drain state.
+ *
+ * @param statusFilter Optional filter by status ("ONLINE", "OFFLINE", or
"DRAINED"). If null, returns all minions.
+ * @param includeTaskCounts Whether to include running task counts For each
minion. Default false.
+ * @return MinionStatusResponse containing status information for minion
instances
+ */
+ public MinionStatusResponse getMinionStatus(String statusFilter, boolean
includeTaskCounts) {
+ // Validate status filter
+ if (statusFilter != null && !statusFilter.isEmpty()) {
+ if (!"ONLINE".equalsIgnoreCase(statusFilter)
+ && !"OFFLINE".equalsIgnoreCase(statusFilter)
+ && !"DRAINED".equalsIgnoreCase(statusFilter)) {
+ throw new IllegalArgumentException("Invalid status filter. Must be
'ONLINE', 'OFFLINE', or 'DRAINED'");
+ }
+ }
+
+ // Get all instances and filter for minions, then sort them
+ List<String> allInstances = _helixResourceManager.getAllInstances();
+ List<String> minionInstances = allInstances.stream()
+ .filter(InstanceTypeUtils::isMinion)
+ .sorted()
+ .collect(Collectors.toList());
+
+ // Get running task counts per minion only if requested (can be expensive)
+ Map<String, Integer> runningTaskCounts = new HashMap<>();
+ if (includeTaskCounts) {
+ try {
+ runningTaskCounts = getRunningTaskCountsPerMinion();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get running task counts from task resource
manager", e);
+ // Continue with an empty map - task counts will be 0
+ }
+ }
+
+ // Build status list for each minion
+ List<MinionStatusResponse.MinionStatus> minionStatusList = new
ArrayList<>();
+ for (String minionInstanceId : minionInstances) {
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ if (instanceConfig == null) {
+ continue;
+ }
+
+ // Determine minion status: OFFLINE (disabled in Helix), DRAINED, or
ONLINE
+ boolean isEnabled = instanceConfig.getInstanceEnabled();
+ List<String> tags = instanceConfig.getTags();
+ boolean isDrained = tags != null &&
tags.contains(Helix.DRAINED_MINION_INSTANCE);
+
+ String status;
+ if (!isEnabled) {
+ status = "OFFLINE";
+ } else if (isDrained) {
+ status = "DRAINED";
+ } else {
+ status = "ONLINE";
+ }
+
+ // Apply status filter if specified
+ if (statusFilter != null && !statusFilter.isEmpty() &&
!status.equalsIgnoreCase(statusFilter)) {
+ continue;
+ }
+
+ // Get host and port
+ String host = instanceConfig.getHostName();
+ int port = Integer.parseInt(instanceConfig.getPort());
+
+ // Get the running task count for this minion
+ int runningTaskCount = runningTaskCounts.getOrDefault(minionInstanceId,
0);
+
+ MinionStatusResponse.MinionStatus minionStatus =
+ new MinionStatusResponse.MinionStatus(minionInstanceId, host, port,
runningTaskCount, status);
+ minionStatusList.add(minionStatus);
+ }
+
+ return new MinionStatusResponse(minionStatusList.size(), minionStatusList);
+ }
+
+ /**
+ * Gets the count of running tasks for each minion instance.
+ * This method iterates through all task workflows and jobs to count tasks
in RUNNING state
+ * assigned to each minion.
+ *
+ * @return Map of minion instance ID to running task count
+ */
+ public synchronized Map<String, Integer> getRunningTaskCountsPerMinion() {
+ Map<String, Integer> runningTaskCounts = new HashMap<>();
+
+ // Get all workflows (task queues)
+ Set<String> workflows = _taskDriver.getWorkflows().keySet();
+
+ for (String workflow : workflows) {
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(workflow);
+ if (workflowContext == null) {
+ continue;
+ }
+
+ // Get all jobs in this workflow
+ Set<String> jobs = workflowContext.getJobStates().keySet();
+ for (String job : jobs) {
+ JobContext jobContext = _taskDriver.getJobContext(job);
+ if (jobContext == null) {
+ continue;
+ }
+
+ // Iterate through all partitions (subtasks) in this job
+ Set<Integer> partitions = jobContext.getPartitionSet();
+ for (Integer partition : partitions) {
+ TaskPartitionState state = jobContext.getPartitionState(partition);
+ // Count INIT and RUNNING tasks (INIT means assigned but not yet
started)
+ if (state == TaskPartitionState.RUNNING || state ==
TaskPartitionState.INIT) {
+ String assignedParticipant =
jobContext.getAssignedParticipant(partition);
+ if (assignedParticipant != null) {
+ runningTaskCounts.put(assignedParticipant,
+ runningTaskCounts.getOrDefault(assignedParticipant, 0) + 1);
+ }
+ }
+ }
+ }
+ }
+
+ return runningTaskCounts;
+ }
+
@JsonPropertyOrder({
"taskState", "subtaskCount", "startTime", "executionStartTime",
"finishTime", "triggeredBy",
"subtaskInfos"
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
new file mode 100644
index 00000000000..21ddcfcea59
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotMinionRestletResourceTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotMinionRestletResourceTest extends ControllerTest {
+
+ private ControllerRequestURLBuilder _urlBuilder = null;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ DEFAULT_INSTANCE.setupSharedStateAndValidate();
+ _urlBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder();
+ }
+
+ @Test
+ public void testMinionStatusEndpoint()
+ throws Exception {
+ // Create 5 minion instances
+ List<String> minionIds = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ String host = "minion-status-test-" + i + ".example.com";
+ int port = 9514;
+ Instance minionInstance = new Instance(host, port, InstanceType.MINION,
+
Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE),
null, 0, 0, 0, 0, false);
+ sendPostRequest(_urlBuilder.forInstanceCreate(),
minionInstance.toJsonString());
+ minionIds.add("Minion_" + host + "_" + port);
+ }
+
+ // Test 1: Get all minions (no filter)
+ String allMinionsUrl = _urlBuilder.forMinionStatus(null, false);
+ String allMinionsResponse = sendGetRequest(allMinionsUrl);
+ JsonNode allMinionsJson = JsonUtils.stringToJsonNode(allMinionsResponse);
+
+ assertNotNull(allMinionsJson.get("currentMinionCount"));
+ // Should have at least our 5 minions (may have more from other tests)
+ assertTrue(allMinionsJson.get("currentMinionCount").asInt() >= 5);
+ assertNotNull(allMinionsJson.get("minionStatus"));
+ assertTrue(allMinionsJson.get("minionStatus").isArray());
+
+ // Verify the structure of minion status
+ for (JsonNode minionStatus : allMinionsJson.get("minionStatus")) {
+ assertNotNull(minionStatus.get("instanceId"));
+ assertNotNull(minionStatus.get("host"));
+ assertNotNull(minionStatus.get("port"));
+ assertNotNull(minionStatus.get("runningTaskCount"));
+ assertNotNull(minionStatus.get("status"));
+ assertEquals(minionStatus.get("status").asText(), "ONLINE");
+ }
+
+ // Test 2: Drain 2 minions
+ String minion0 = minionIds.get(0);
+ String minion2 = minionIds.get(2);
+ sendPutRequest(_urlBuilder.forInstanceState(minion0) + "?state=DRAIN", "");
+ sendPutRequest(_urlBuilder.forInstanceState(minion2) + "?state=DRAIN", "");
+
+ // Test 3: Get only DRAINED minions
+ String drainedMinionsUrl = _urlBuilder.forMinionStatus("DRAINED", false);
+ String drainedMinionsResponse = sendGetRequest(drainedMinionsUrl);
+ JsonNode drainedMinionsJson =
JsonUtils.stringToJsonNode(drainedMinionsResponse);
+
+ assertTrue(drainedMinionsJson.get("currentMinionCount").asInt() >= 2);
+ for (JsonNode minionStatus : drainedMinionsJson.get("minionStatus")) {
+ assertEquals(minionStatus.get("status").asText(), "DRAINED");
+ }
+
+ // Test 4: Get only ONLINE minions
+ String onlineMinionsUrl = _urlBuilder.forMinionStatus("ONLINE", false);
+ String onlineMinionsResponse = sendGetRequest(onlineMinionsUrl);
+ JsonNode onlineMinionsJson =
JsonUtils.stringToJsonNode(onlineMinionsResponse);
+
+ assertTrue(onlineMinionsJson.get("currentMinionCount").asInt() >= 3);
+ for (JsonNode minionStatus : onlineMinionsJson.get("minionStatus")) {
+ assertEquals(minionStatus.get("status").asText(), "ONLINE");
+ }
+
+ // Cleanup
+ for (String minionId : minionIds) {
+ sendDeleteRequest(_urlBuilder.forInstance(minionId));
+ }
+ }
+
+ @Test
+ public void testMinionStatusEndpointCaseInsensitive()
+ throws Exception {
+ // Create a minion and drain it
+ String host = "minion-case-test.example.com";
+ int port = 9514;
+ Instance minionInstance = new Instance(host, port, InstanceType.MINION,
+
Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE),
null, 0, 0, 0, 0, false);
+ sendPostRequest(_urlBuilder.forInstanceCreate(),
minionInstance.toJsonString());
+ String minionId = "Minion_" + host + "_" + port;
+ sendPutRequest(_urlBuilder.forInstanceState(minionId) + "?state=DRAIN",
"");
+
+ // Test case-insensitive status filter
+ String upperCaseUrl = _urlBuilder.forMinionStatus("DRAINED", false);
+ String lowerCaseUrl = _urlBuilder.forMinionStatus("drained", false);
+
+ String upperResponse = sendGetRequest(upperCaseUrl);
+ String lowerResponse = sendGetRequest(lowerCaseUrl);
+
+ JsonNode upperJson = JsonUtils.stringToJsonNode(upperResponse);
+ JsonNode lowerJson = JsonUtils.stringToJsonNode(lowerResponse);
+
+ // Both should return the same results
+ assertTrue(upperJson.get("currentMinionCount").asInt() >= 1);
+ assertTrue(lowerJson.get("currentMinionCount").asInt() >= 1);
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(minionId));
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ DEFAULT_INSTANCE.cleanup();
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
new file mode 100644
index 00000000000..91274a9aa82
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionStatusTest.java
@@ -0,0 +1,812 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.pinot.controller.api.resources.MinionStatusResponse;
+import org.apache.pinot.controller.helix.ControllerTest;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for minion status query functionality in
PinotHelixTaskResourceManager
+ */
+public class PinotHelixResourceManagerMinionStatusTest extends ControllerTest {
+
+ private PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+ startController();
+ _pinotHelixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
+ }
+
+ @AfterMethod
+ public void cleanupAfterTest() {
+ // Clean up all minion instances after each test to ensure test isolation
+ try {
+ for (String instanceId : _helixResourceManager.getAllInstances()) {
+ if (instanceId.startsWith("Minion_")) {
+ try {
+ _helixResourceManager.dropInstance(instanceId);
+ } catch (Exception e) {
+ // Ignore errors during cleanup
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Ignore errors getting all instances
+ }
+ }
+
+ /**
+ * Helper method to create a mock TaskDriver with workflows and jobs
containing tasks assigned to minions
+ * @param taskAssignments Map of minion instance ID to number of RUNNING
tasks
+ * @return Mocked TaskDriver
+ */
+ private TaskDriver createMockTaskDriverWithRunningTasks(Map<String, Integer>
taskAssignments) {
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+ // Create a mock workflows - getWorkflows() returns a Map<String,
WorkflowConfig>
+ Map<String, WorkflowConfig> workflows = new HashMap<>();
+ workflows.put("TestWorkflow1", mock(WorkflowConfig.class));
+ when(mockTaskDriver.getWorkflows()).thenReturn(workflows);
+
+ // Create workflow context with jobs
+ WorkflowContext workflowContext = mock(WorkflowContext.class);
+
when(mockTaskDriver.getWorkflowContext("TestWorkflow1")).thenReturn(workflowContext);
+
+ Map<String, TaskState> jobStates = new HashMap<>();
+ jobStates.put("TestJob1", TaskState.IN_PROGRESS);
+ when(workflowContext.getJobStates()).thenReturn(jobStates);
+
+ // Create job context with tasks assigned to minions
+ JobContext jobContext = mock(JobContext.class);
+ when(mockTaskDriver.getJobContext("TestJob1")).thenReturn(jobContext);
+
+ Set<Integer> partitions = new HashSet<>();
+ int partitionId = 0;
+ for (Map.Entry<String, Integer> entry : taskAssignments.entrySet()) {
+ String minionId = entry.getKey();
+ int taskCount = entry.getValue();
+ for (int i = 0; i < taskCount; i++) {
+ partitions.add(partitionId);
+
when(jobContext.getPartitionState(partitionId)).thenReturn(TaskPartitionState.RUNNING);
+
when(jobContext.getAssignedParticipant(partitionId)).thenReturn(minionId);
+ partitionId++;
+ }
+ }
+ when(jobContext.getPartitionSet()).thenReturn(partitions);
+
+ return mockTaskDriver;
+ }
+
+ @Test
+ public void testGetMinionStatusWithNoMinions() {
+ // Test with no minions registered - no tasks should be running
+ TaskDriver mockTaskDriver = createMockTaskDriverWithRunningTasks(new
HashMap<>());
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+ assertNotNull(response);
+ assertEquals(response.getCurrentMinionCount(), 0);
+ assertNotNull(response.getMinionStatus());
+ assertTrue(response.getMinionStatus().isEmpty());
+ }
+
+ @Test
+ public void testGetMinionStatusWithOnlineMinions() {
+ // Create 3 online minions
+ for (int i = 0; i < 3; i++) {
+ String minionHost = "minion-test-" + i + ".example.com";
+ int minionPort = 9514;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0,
0, 0, 0, false);
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful(), "Failed to add minion instance");
+ }
+
+ // Create a mock TaskDriver with running tasks: minion-0 has 2 tasks,
minion-1 has 1 task, minion-2 has 0 tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_minion-test-0.example.com_9514", 2);
+ taskAssignments.put("Minion_minion-test-1.example.com_9514", 1);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get all minions
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+ assertNotNull(response);
+ assertEquals(response.getCurrentMinionCount(), 3);
+ assertEquals(response.getMinionStatus().size(), 3);
+
+ // Verify all are ONLINE and check task counts
+ for (MinionStatusResponse.MinionStatus status :
response.getMinionStatus()) {
+ assertEquals(status.getStatus(), "ONLINE");
+ assertNotNull(status.getInstanceId());
+ assertNotNull(status.getHost());
+ assertTrue(status.getPort() > 0);
+
+ // Verify task counts match expected values
+ if
(status.getInstanceId().equals("Minion_minion-test-0.example.com_9514")) {
+ assertEquals(status.getRunningTaskCount(), 2);
+ } else if
(status.getInstanceId().equals("Minion_minion-test-1.example.com_9514")) {
+ assertEquals(status.getRunningTaskCount(), 1);
+ } else {
+ assertEquals(status.getRunningTaskCount(), 0);
+ }
+ }
+
+ // Cleanup
+ for (int i = 0; i < 3; i++) {
+ String minionInstanceId = "Minion_minion-test-" + i + ".example.com_" +
(9514);
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+ }
+
+ @Test
+ public void testGetMinionStatusWithDrainedMinions() {
+ // Create 5 minions
+ for (int i = 0; i < 5; i++) {
+ String minionHost = "minion-drain-test-" + i + ".example.com";
+ int minionPort = 9514;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0,
0, 0, 0, false);
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+ }
+
+ // Drain minions 1 and 3
+ String minion1 = "Minion_minion-drain-test-1.example.com_9514";
+ String minion3 = "Minion_minion-drain-test-3.example.com_9514";
+ _helixResourceManager.drainMinionInstance(minion1);
+ _helixResourceManager.drainMinionInstance(minion3);
+
+ // Create a mock TaskDriver with running tasks: drained minions can still
have running tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_minion-drain-test-0.example.com_9514", 3);
+ taskAssignments.put(minion1, 1); // Drained but still has 1 running task
+ taskAssignments.put("Minion_minion-drain-test-2.example.com_9514", 2);
+ taskAssignments.put(minion3, 2); // Drained but still has 2 running tasks
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get all minions
+ MinionStatusResponse allResponse =
taskResourceManager.getMinionStatus(null, true);
+ assertEquals(allResponse.getCurrentMinionCount(), 5);
+ assertEquals(allResponse.getMinionStatus().size(), 5);
+
+ // Count online vs drained
+ long onlineCount = allResponse.getMinionStatus().stream()
+ .filter(m -> "ONLINE".equals(m.getStatus()))
+ .count();
+ long drainedCount = allResponse.getMinionStatus().stream()
+ .filter(m -> "DRAINED".equals(m.getStatus()))
+ .count();
+ assertEquals(onlineCount, 3);
+ assertEquals(drainedCount, 2);
+
+ // Get only ONLINE minions
+ MinionStatusResponse onlineResponse =
taskResourceManager.getMinionStatus("ONLINE", true);
+ assertEquals(onlineResponse.getCurrentMinionCount(), 3);
+ assertEquals(onlineResponse.getMinionStatus().size(), 3);
+ for (MinionStatusResponse.MinionStatus status :
onlineResponse.getMinionStatus()) {
+ assertEquals(status.getStatus(), "ONLINE");
+ }
+
+ // Get only DRAINED minions
+ MinionStatusResponse drainedResponse =
taskResourceManager.getMinionStatus("DRAINED", true);
+ assertEquals(drainedResponse.getCurrentMinionCount(), 2);
+ assertEquals(drainedResponse.getMinionStatus().size(), 2);
+ for (MinionStatusResponse.MinionStatus status :
drainedResponse.getMinionStatus()) {
+ assertEquals(status.getStatus(), "DRAINED");
+ assertTrue(status.getInstanceId().equals(minion1) ||
status.getInstanceId().equals(minion3));
+ // Verify drained minions can still have running tasks
+ if (status.getInstanceId().equals(minion1)) {
+ assertEquals(status.getRunningTaskCount(), 1);
+ } else if (status.getInstanceId().equals(minion3)) {
+ assertEquals(status.getRunningTaskCount(), 2);
+ }
+ }
+
+ // Cleanup
+ for (int i = 0; i < 5; i++) {
+ String minionInstanceId = "Minion_minion-drain-test-" + i +
".example.com_" + 9514;
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+ }
+
+ @Test
+ public void testGetMinionStatusWithFilter() {
+ // Create 6 minions, drain 3 of them
+ for (int i = 0; i < 6; i++) {
+ String minionHost = "minion-filter-" + i + ".example.com";
+ int minionPort = 9514;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0,
0, 0, 0, false);
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+ }
+
+ // Drain minions 0, 2, 4
+ for (int i = 0; i < 6; i += 2) {
+ String minionInstanceId = "Minion_minion-filter-" + i + ".example.com_"
+ (9514);
+ _helixResourceManager.drainMinionInstance(minionInstanceId);
+ }
+
+ // Create a mock TaskDriver with running tasks: drained minions have
tasks, online minions have tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_minion-filter-0.example.com_9514", 2); //
Drained
+ taskAssignments.put("Minion_minion-filter-1.example.com_9514", 3); //
Online
+ taskAssignments.put("Minion_minion-filter-2.example.com_9514", 1); //
Drained
+ taskAssignments.put("Minion_minion-filter-3.example.com_9514", 2); //
Online
+ taskAssignments.put("Minion_minion-filter-4.example.com_9514", 1); //
Drained
+ taskAssignments.put("Minion_minion-filter-5.example.com_9514", 1); //
Online
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get drained minions
+ MinionStatusResponse response =
taskResourceManager.getMinionStatus("DRAINED", true);
+ assertEquals(response.getCurrentMinionCount(), 3);
+ assertEquals(response.getMinionStatus().size(), 3);
+ for (MinionStatusResponse.MinionStatus status :
response.getMinionStatus()) {
+ assertEquals(status.getStatus(), "DRAINED");
+ }
+
+ // Get online minions
+ MinionStatusResponse onlineResponse =
taskResourceManager.getMinionStatus("ONLINE", true);
+ assertEquals(onlineResponse.getCurrentMinionCount(), 3);
+ assertEquals(onlineResponse.getMinionStatus().size(), 3);
+ for (MinionStatusResponse.MinionStatus status :
onlineResponse.getMinionStatus()) {
+ assertEquals(status.getStatus(), "ONLINE");
+ }
+
+ // Cleanup
+ for (int i = 0; i < 6; i++) {
+ String minionInstanceId = "Minion_minion-filter-" + i + ".example.com_"
+ (9514);
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetMinionStatusWithInvalidFilter() {
+ // Create a minion
+ String minionHost = "minion-invalid-filter.example.com";
+ int minionPort = 9514;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ _helixResourceManager.addInstance(minionInstance, false);
+
+ // Create a mock TaskDriver with running tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_" + minionHost + "_" + minionPort, 1);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ try {
+ // Try with invalid status filter - should throw IllegalArgumentException
+ taskResourceManager.getMinionStatus("INVALID_STATUS", true);
+ } finally {
+ // Cleanup
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+ }
+
+ @Test
+ public void testGetMinionStatusCaseInsensitiveFilter() {
+ // Create 2 minions, drain one
+ String minionHost1 = "minion-case-test-1.example.com";
+ String minionHost2 = "minion-case-test-2.example.com";
+ int minionPort = 9514;
+
+ Instance minion1 = new Instance(minionHost1, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ Instance minion2 = new Instance(minionHost2, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+
+ _helixResourceManager.addInstance(minion1, false);
+ _helixResourceManager.addInstance(minion2, false);
+
+ String minion1Id = "Minion_" + minionHost1 + "_" + minionPort;
+ String minion2Id = "Minion_" + minionHost2 + "_" + minionPort;
+ _helixResourceManager.drainMinionInstance(minion1Id);
+
+ // Create a mock TaskDriver with running tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(minion1Id, 2); // Drained minion with 2 tasks
+ taskAssignments.put(minion2Id, 3); // Online minion with 3 tasks
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Test case-insensitive filters
+ MinionStatusResponse responseUpperCase =
taskResourceManager.getMinionStatus("DRAINED", true);
+ MinionStatusResponse responseLowerCase =
taskResourceManager.getMinionStatus("drained", true);
+ MinionStatusResponse responseMixedCase =
taskResourceManager.getMinionStatus("Drained", true);
+
+ assertEquals(responseUpperCase.getCurrentMinionCount(), 1);
+ assertEquals(responseLowerCase.getCurrentMinionCount(), 1);
+ assertEquals(responseMixedCase.getCurrentMinionCount(), 1);
+
+ // Verify task counts in all responses
+
assertEquals(responseUpperCase.getMinionStatus().get(0).getRunningTaskCount(),
2);
+
assertEquals(responseLowerCase.getMinionStatus().get(0).getRunningTaskCount(),
2);
+
assertEquals(responseMixedCase.getMinionStatus().get(0).getRunningTaskCount(),
2);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minion1Id);
+ _helixResourceManager.dropInstance(minion2Id);
+ }
+
+ @Test
+ public void testGetMinionStatusOnlyReturnsMinions() {
+ // Create different instance types
+ String brokerHost = "broker-test.example.com";
+ String serverHost = "server-test.example.com";
+ String minionHost = "minion-type-test.example.com";
+
+ Instance brokerInstance = new Instance(brokerHost, 8099,
InstanceType.BROKER,
+ Collections.singletonList("DefaultTenant_BROKER"), null, 0, 0, 0, 0,
false);
+ Instance serverInstance = new Instance(serverHost, 8098,
InstanceType.SERVER,
+ Collections.singletonList("DefaultTenant_OFFLINE"), null, 0, 0, 0, 0,
false);
+ Instance minionInstance = new Instance(minionHost, 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+
+ _helixResourceManager.addInstance(brokerInstance, false);
+ _helixResourceManager.addInstance(serverInstance, false);
+ _helixResourceManager.addInstance(minionInstance, false);
+
+ // Create a mock TaskDriver with running tasks - only for minion (broker
and server won't have task counts)
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_" + minionHost + "_9514", 5);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get minion status - should only return the minion instance
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+ assertEquals(response.getCurrentMinionCount(), 1);
+ assertEquals(response.getMinionStatus().size(), 1);
+ assertEquals(response.getMinionStatus().get(0).getHost(), minionHost);
+ assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 5);
+
+ // Cleanup
+ _helixResourceManager.dropInstance("Broker_" + brokerHost + "_8099");
+ _helixResourceManager.dropInstance("Server_" + serverHost + "_8098");
+ _helixResourceManager.dropInstance("Minion_" + minionHost + "_9514");
+ }
+
+ @Test
+ public void testGetMinionStatusReturnsCorrectPortAndHost() {
+ // Create minion with specific host and port
+ String expectedHost = "specific-minion-host.example.com";
+ int expectedPort = 9514;
+
+ Instance minionInstance = new Instance(expectedHost, expectedPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ _helixResourceManager.addInstance(minionInstance, false);
+
+ // Create a mock TaskDriver with running tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put("Minion_" + expectedHost + "_" + expectedPort, 4);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get status and verify host/port
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+ assertEquals(response.getCurrentMinionCount(), 1);
+
+ MinionStatusResponse.MinionStatus status =
response.getMinionStatus().get(0);
+ assertEquals(status.getHost(), expectedHost);
+ assertEquals(status.getPort(), expectedPort);
+ assertEquals(status.getInstanceId(), "Minion_" + expectedHost + "_" +
expectedPort);
+ assertEquals(status.getRunningTaskCount(), 4);
+
+ // Cleanup
+ _helixResourceManager.dropInstance("Minion_" + expectedHost + "_" +
expectedPort);
+ }
+
+ @Test
+ public void testGetMinionStatusWithRunningTasks() {
+ // Create 3 minions
+ String minion1Id = "Minion_minion-running-1.example.com_9514";
+ String minion2Id = "Minion_minion-running-2.example.com_9514";
+ String minion3Id = "Minion_minion-running-3.example.com_9514";
+
+ Instance minion1 = new Instance("minion-running-1.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ Instance minion2 = new Instance("minion-running-2.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ Instance minion3 = new Instance("minion-running-3.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+
+ _helixResourceManager.addInstance(minion1, false);
+ _helixResourceManager.addInstance(minion2, false);
+ _helixResourceManager.addInstance(minion3, false);
+
+ // Create a mock TaskDriver with running tasks: minion1 has 3 tasks,
minion2 has 1 task, minion3 has 0 tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(minion1Id, 3);
+ taskAssignments.put(minion2Id, 1);
+ // minion3 not in map = 0 running tasks
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Call getMinionStatus on the task resource manager
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+
+ assertNotNull(response);
+ assertEquals(response.getCurrentMinionCount(), 3);
+ assertEquals(response.getMinionStatus().size(), 3);
+
+ // Verify running task counts for each minion
+ for (MinionStatusResponse.MinionStatus status :
response.getMinionStatus()) {
+ if (status.getInstanceId().equals(minion1Id)) {
+ assertEquals(status.getRunningTaskCount(), 3);
+ } else if (status.getInstanceId().equals(minion2Id)) {
+ assertEquals(status.getRunningTaskCount(), 1);
+ } else if (status.getInstanceId().equals(minion3Id)) {
+ assertEquals(status.getRunningTaskCount(), 0);
+ }
+ }
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minion1Id);
+ _helixResourceManager.dropInstance(minion2Id);
+ _helixResourceManager.dropInstance(minion3Id);
+ }
+
+ @Test
+ public void testGetMinionStatusWithRunningTasksAndInitTasks() {
+ // This test verifies that getMinionStatus correctly reports running task
counts
+ // including both RUNNING and INIT states (tasks assigned but not yet
started)
+
+ // Create 2 minions
+ String minion1Id = "Minion_minion-init-test-1.example.com_9514";
+ String minion2Id = "Minion_minion-init-test-2.example.com_9514";
+
+ Instance minion1 = new Instance("minion-init-test-1.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ Instance minion2 = new Instance("minion-init-test-2.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+
+ _helixResourceManager.addInstance(minion1, false);
+ _helixResourceManager.addInstance(minion2, false);
+
+ // Create a mock TaskDriver with running tasks
+ // Simulate: minion1 has 3 RUNNING tasks, minion2 has 1 RUNNING task
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(minion1Id, 3); // 3 RUNNING tasks
+ taskAssignments.put(minion2Id, 1); // 1 RUNNING task
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Call getMinionStatus
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+
+ assertNotNull(response);
+ assertEquals(response.getCurrentMinionCount(), 2);
+
+ // Verify that the running task counts include both RUNNING and INIT tasks
+ Map<String, Integer> resultCounts = new HashMap<>();
+ for (MinionStatusResponse.MinionStatus status :
response.getMinionStatus()) {
+ resultCounts.put(status.getInstanceId(), status.getRunningTaskCount());
+ }
+
+ assertEquals(resultCounts.get(minion1Id).intValue(), 3);
+ assertEquals(resultCounts.get(minion2Id).intValue(), 1);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minion1Id);
+ _helixResourceManager.dropInstance(minion2Id);
+ }
+
+ @Test
+ public void testGetMinionStatusWithDrainedMinionAndRunningTasks() {
+ // Test that drained minions can still show running task counts
+ // (tasks that were assigned before draining)
+
+ String minionId = "Minion_minion-drained-running.example.com_9514";
+ Instance minion = new Instance("minion-drained-running.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ _helixResourceManager.addInstance(minion, false);
+
+ // Drain the minion
+ _helixResourceManager.drainMinionInstance(minionId);
+
+ // Create a mock TaskDriver with running tasks
+ // Even though drained, minion might still have running tasks that were
assigned before draining
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(minionId, 2);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get all minions
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+ assertEquals(response.getCurrentMinionCount(), 1);
+
+ MinionStatusResponse.MinionStatus status =
response.getMinionStatus().get(0);
+ assertEquals(status.getInstanceId(), minionId);
+ assertEquals(status.getStatus(), "DRAINED");
+ assertEquals(status.getRunningTaskCount(), 2); // Should still show
running tasks
+
+ // Get only DRAINED minions
+ MinionStatusResponse drainedResponse =
taskResourceManager.getMinionStatus("DRAINED", true);
+ assertEquals(drainedResponse.getCurrentMinionCount(), 1);
+
assertEquals(drainedResponse.getMinionStatus().get(0).getRunningTaskCount(), 2);
+
+ // Get only ONLINE minions
+ MinionStatusResponse onlineResponse =
taskResourceManager.getMinionStatus("ONLINE", true);
+ assertEquals(onlineResponse.getCurrentMinionCount(), 0);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionId);
+ }
+
+ @Test
+ public void testGetMinionStatusRunningTaskCountException() {
+ // Test that getMinionStatus handles exceptions from
getRunningTaskCountsPerMinion gracefully
+
+ String minionId = "Minion_minion-exception-test.example.com_9514";
+ Instance minion = new Instance("minion-exception-test.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ _helixResourceManager.addInstance(minion, false);
+
+ // Create a mock TaskDriver that throws an exception when accessing
workflows
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ Mockito.doThrow(new RuntimeException("Simulated task driver failure"))
+ .when(mockTaskDriver).getWorkflows();
+
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // getMinionStatus should still work, just with 0 running task counts
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
true);
+
+ assertNotNull(response);
+ assertEquals(response.getCurrentMinionCount(), 1);
+ assertEquals(response.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionId);
+ }
+
+ @Test
+ public void testGetMinionStatusWithOfflineMinions() {
+ // Test OFFLINE status (Helix disabled minions)
+ // Create 4 minions: 2 ONLINE, 1 OFFLINE, 1 DRAINED
+ String onlineMinion1 = "Minion_minion-offline-test-0.example.com_9514";
+ String offlineMinion = "Minion_minion-offline-test-1.example.com_9514";
+ String onlineMinion2 = "Minion_minion-offline-test-2.example.com_9514";
+ String drainedMinion = "Minion_minion-offline-test-3.example.com_9514";
+
+ for (int i = 0; i < 4; i++) {
+ Instance minion = new Instance("minion-offline-test-" + i +
".example.com", 9514, InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0,
0, 0, 0, false);
+ _helixResourceManager.addInstance(minion, false);
+ }
+
+ // Disable one minion in Helix (OFFLINE)
+ _helixResourceManager.disableInstance(offlineMinion);
+
+ // Drain one minion (DRAINED)
+ _helixResourceManager.drainMinionInstance(drainedMinion);
+
+ // Create a mock TaskDriver with running tasks
+ // Note: OFFLINE minions cannot have running tasks since they are disabled
in Helix
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(onlineMinion1, 2);
+ taskAssignments.put(onlineMinion2, 3);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Get all minions
+ MinionStatusResponse allResponse =
taskResourceManager.getMinionStatus(null, true);
+ assertEquals(allResponse.getCurrentMinionCount(), 4);
+ assertEquals(allResponse.getMinionStatus().size(), 4);
+
+ // Count statuses
+ Map<String, Long> statusCounts = allResponse.getMinionStatus().stream()
+
.collect(Collectors.groupingBy(MinionStatusResponse.MinionStatus::getStatus,
Collectors.counting()));
+ assertEquals(statusCounts.get("ONLINE").longValue(), 2);
+ assertEquals(statusCounts.get("OFFLINE").longValue(), 1);
+ assertEquals(statusCounts.get("DRAINED").longValue(), 1);
+
+ // Get only OFFLINE minions
+ MinionStatusResponse offlineResponse =
taskResourceManager.getMinionStatus("OFFLINE", true);
+ assertEquals(offlineResponse.getCurrentMinionCount(), 1);
+ assertEquals(offlineResponse.getMinionStatus().size(), 1);
+ assertEquals(offlineResponse.getMinionStatus().get(0).getInstanceId(),
offlineMinion);
+ assertEquals(offlineResponse.getMinionStatus().get(0).getStatus(),
"OFFLINE");
+
assertEquals(offlineResponse.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+ // Cleanup
+ for (int i = 0; i < 4; i++) {
+ _helixResourceManager.dropInstance("Minion_minion-offline-test-" + i +
".example.com_9514");
+ }
+ }
+
+ @Test
+ public void testGetMinionStatusWithoutTaskCounts() {
+ // Test that includeTaskCounts=false doesn't query task counts (all should
be 0)
+ String minion1Id = "Minion_minion-no-tasks-1.example.com_9514";
+ String minion2Id = "Minion_minion-no-tasks-2.example.com_9514";
+
+ Instance minion1 = new Instance("minion-no-tasks-1.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ Instance minion2 = new Instance("minion-no-tasks-2.example.com", 9514,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+
+ _helixResourceManager.addInstance(minion1, false);
+ _helixResourceManager.addInstance(minion2, false);
+
+ // Create a mock TaskDriver with running tasks
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(minion1Id, 5);
+ taskAssignments.put(minion2Id, 3);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Call with includeTaskCounts=false - all task counts should be 0
+ MinionStatusResponse response = taskResourceManager.getMinionStatus(null,
false);
+
+ assertEquals(response.getCurrentMinionCount(), 2);
+ assertEquals(response.getMinionStatus().size(), 2);
+
+ // Verify all task counts are 0 (not queried)
+ for (MinionStatusResponse.MinionStatus status :
response.getMinionStatus()) {
+ assertEquals(status.getRunningTaskCount(), 0);
+ }
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minion1Id);
+ _helixResourceManager.dropInstance(minion2Id);
+ }
+
+ @Test
+ public void testGetMinionStatusMixedStates() {
+ // Test with a mix of ONLINE, OFFLINE, and DRAINED minions
+ String onlineMinion = "Minion_minion-mixed-0.example.com_9514";
+ String offlineMinion = "Minion_minion-mixed-1.example.com_9514";
+ String drainedMinion = "Minion_minion-mixed-2.example.com_9514";
+ String onlineMinion2 = "Minion_minion-mixed-3.example.com_9514";
+
+ for (int i = 0; i < 4; i++) {
+ Instance minion = new Instance("minion-mixed-" + i + ".example.com",
9514, InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0,
0, 0, 0, false);
+ _helixResourceManager.addInstance(minion, false);
+ }
+
+ // Set different states
+ _helixResourceManager.disableInstance(offlineMinion); // OFFLINE
+ _helixResourceManager.drainMinionInstance(drainedMinion); // DRAINED
+
+ // OFFLINE minions cannot have running tasks since they are disabled in
Helix
+ Map<String, Integer> taskAssignments = new HashMap<>();
+ taskAssignments.put(onlineMinion, 1);
+ taskAssignments.put(drainedMinion, 3);
+ taskAssignments.put(onlineMinion2, 4);
+
+ TaskDriver mockTaskDriver =
createMockTaskDriverWithRunningTasks(taskAssignments);
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ // Test filtering by each status
+ MinionStatusResponse onlineResponse =
taskResourceManager.getMinionStatus("ONLINE", true);
+ assertEquals(onlineResponse.getCurrentMinionCount(), 2);
+ for (MinionStatusResponse.MinionStatus status :
onlineResponse.getMinionStatus()) {
+ assertEquals(status.getStatus(), "ONLINE");
+ assertTrue(status.getRunningTaskCount() > 0);
+ }
+
+ MinionStatusResponse offlineResponse =
taskResourceManager.getMinionStatus("OFFLINE", true);
+ assertEquals(offlineResponse.getCurrentMinionCount(), 1);
+ assertEquals(offlineResponse.getMinionStatus().get(0).getStatus(),
"OFFLINE");
+ assertEquals(offlineResponse.getMinionStatus().get(0).getInstanceId(),
offlineMinion);
+
assertEquals(offlineResponse.getMinionStatus().get(0).getRunningTaskCount(), 0);
+
+ MinionStatusResponse drainedResponse =
taskResourceManager.getMinionStatus("DRAINED", true);
+ assertEquals(drainedResponse.getCurrentMinionCount(), 1);
+ assertEquals(drainedResponse.getMinionStatus().get(0).getStatus(),
"DRAINED");
+ assertEquals(drainedResponse.getMinionStatus().get(0).getInstanceId(),
drainedMinion);
+
+ // Cleanup
+ for (int i = 0; i < 4; i++) {
+ _helixResourceManager.dropInstance("Minion_minion-mixed-" + i +
".example.com_9514");
+ }
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testGetMinionStatusWithInvalidOfflineFilter() {
+ // Test that invalid status filter throws exception (test with various
invalid values)
+ String minionHost = "minion-invalid-offline.example.com";
+ int minionPort = 9514;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0,
0, 0, false);
+ _helixResourceManager.addInstance(minionInstance, false);
+
+ TaskDriver mockTaskDriver = createMockTaskDriverWithRunningTasks(new
HashMap<>());
+ PinotHelixTaskResourceManager taskResourceManager = new
PinotHelixTaskResourceManager(
+ _helixResourceManager, mockTaskDriver);
+
+ try {
+ // Try with invalid status filter - should throw IllegalArgumentException
+ taskResourceManager.getMinionStatus("DISABLED", false);
+ } finally {
+ // Cleanup
+ _helixResourceManager.dropInstance("Minion_" + minionHost + "_" +
minionPort);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopController();
+ stopZk();
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2da3d879d0a..12fa14bcc82 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.utils.builder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -125,6 +126,21 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "tasks", "task", taskName);
}
+ public String forMinionStatus(@Nullable String statusFilter, boolean
includeTaskCounts) {
+ StringBuilder url = new StringBuilder(StringUtil.join("/", _baseUrl,
"minions", "status"));
+ List<String> params = new ArrayList<>();
+ if (statusFilter != null && !statusFilter.isEmpty()) {
+ params.add("status=" + statusFilter);
+ }
+ if (includeTaskCounts) {
+ params.add("includeTaskCounts=true");
+ }
+ if (!params.isEmpty()) {
+ url.append("?").append(String.join("&", params));
+ }
+ return url.toString();
+ }
+
public String forStopMinionTaskQueue(String taskType) {
return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]