This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d33f32763e8 Add Support to tag a minion as Drained (#17375)
d33f32763e8 is described below

commit d33f32763e80e0fc6c2d1596e4162e1e4584d019
Author: Krishna Koganti <[email protected]>
AuthorDate: Wed Jan 7 04:48:32 2026 -0800

    Add Support to tag a minion as Drained (#17375)
    
    * Add Support to tag a minion as Drained
    
    * Dont use * in imports
    
    * Address comments
    
    * Address comments
    
    * Add funcitonality to restore previous tags on a drained minion
    
    * Address comments
---
 .../resources/PinotInstanceRestletResource.java    |  22 +-
 .../pinot/controller/api/resources/StateType.java  |   2 +-
 .../helix/core/PinotHelixResourceManager.java      |  96 ++++
 .../api/PinotInstanceRestletResourceTest.java      | 153 +++++++
 .../PinotHelixResourceManagerMinionDrainTest.java  | 495 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +
 6 files changed, 767 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index cdd8bfdad7c..80babcb368f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -66,6 +66,7 @@ import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -256,7 +257,9 @@ public class PinotInstanceRestletResource {
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.TEXT_PLAIN)
-  @ApiOperation(value = "Enable/disable an instance", notes = "Enable/disable 
an instance")
+  @ApiOperation(value = "Enable/disable/drain an instance", notes = 
"Enable/disable/drain an instance. "
+      + "DRAIN state is only applicable to minion instances and retags them 
from minion_untagged to minion_drained, "
+      + "preventing new task assignments while allowing existing tasks to 
complete.")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"),
       @ApiResponse(code = 400, message = "Bad Request"),
@@ -264,9 +267,10 @@ public class PinotInstanceRestletResource {
       @ApiResponse(code = 500, message = "Internal error")
   })
   public SuccessResponse toggleInstanceState(
-      @ApiParam(value = "Instance name", required = true, example = 
"Server_a.b.com_20000 | Broker_my.broker.com_30000")
+      @ApiParam(value = "Instance name", required = true, example = 
"Server_a.b.com_20000 | Broker_my.broker.com_30000 "
+          + "| Minion_hostname_9514")
       @PathParam("instanceName") String instanceName,
-      @ApiParam(value = "enable|disable", required = true) 
@QueryParam("state") String state) {
+      @ApiParam(value = "enable|disable|drain", required = true) 
@QueryParam("state") String state) {
     if (!_pinotHelixResourceManager.instanceExists(instanceName)) {
       throw new ControllerApplicationException(LOGGER, "Instance '" + 
instanceName + "' does not exist",
           Response.Status.NOT_FOUND);
@@ -286,6 +290,18 @@ public class PinotInstanceRestletResource {
             "Failed to disable instance '" + instanceName + "': " + 
response.getMessage(),
             Response.Status.INTERNAL_SERVER_ERROR);
       }
+    } else if (StateType.DRAIN.name().equalsIgnoreCase(state)) {
+      if (!InstanceTypeUtils.isMinion(instanceName)) {
+        throw new ControllerApplicationException(LOGGER,
+            "DRAIN state only applies to minion instances. Instance '" + 
instanceName + "' is not a minion.",
+            Response.Status.BAD_REQUEST);
+      }
+      PinotResourceManagerResponse response = 
_pinotHelixResourceManager.drainMinionInstance(instanceName);
+      if (!response.isSuccessful()) {
+        throw new ControllerApplicationException(LOGGER,
+            "Failed to drain minion instance '" + instanceName + "': " + 
response.getMessage(),
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
     } else {
       throw new ControllerApplicationException(LOGGER, "Unknown state '" + 
state + "'", Response.Status.BAD_REQUEST);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
index e61834cf51f..879b49e368c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.controller.api.resources;
 
 public enum StateType {
-  ENABLE, DISABLE, DROP
+  ENABLE, DISABLE, DROP, DRAIN
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f02c6376e9d..ef65f6abcfb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3602,6 +3602,47 @@ public class PinotHelixResourceManager {
     return PinotResourceManagerResponse.success("Instance " + instanceName + " 
dropped");
   }
 
+  /**
+   * Drains a minion instance by preventing new task assignments while 
allowing existing tasks to complete.
+   * This is achieved by replacing all instance tags with minion_drained. 
Since Helix uses containsTag()
+   * for task assignment matching, keeping any existing tags would still allow 
task assignments.
+   *
+   * @param instanceName Name of the minion instance to drain
+   * @return Response indicating success or failure
+   * @throws UnsupportedOperationException if the minion is already drained
+   */
+  public synchronized PinotResourceManagerResponse drainMinionInstance(String 
instanceName) {
+    InstanceConfig instanceConfig = getHelixInstanceConfig(instanceName);
+    if (instanceConfig == null) {
+      return PinotResourceManagerResponse.failure("Instance " + instanceName + 
" not found");
+    }
+
+    // Validate that minion is not already drained
+    List<String> currentTags = instanceConfig.getTags();
+    if (currentTags != null && 
currentTags.contains(Helix.DRAINED_MINION_INSTANCE)) {
+      return PinotResourceManagerResponse.failure("Minion instance " + 
instanceName + " is already drained");
+    }
+
+    // Store original tags so they can be restored when enabling the minion
+    if (currentTags != null && !currentTags.isEmpty()) {
+      instanceConfig.getRecord().setListField(Helix.PREVIOUS_TAGS, new 
ArrayList<>(currentTags));
+    }
+
+    // Replace all tags with minion_drained to prevent any task assignments
+    List<String> updatedTags = 
Collections.singletonList(Helix.DRAINED_MINION_INSTANCE);
+    instanceConfig.getRecord().setListField(
+        InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), updatedTags);
+
+    // Save to Helix
+    if 
(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceName), 
instanceConfig)) {
+      return PinotResourceManagerResponse.failure("Failed to set instance 
config for instance: " + instanceName);
+    }
+
+    LOGGER.info("Successfully drained minion instance: {}", instanceName);
+    return PinotResourceManagerResponse.success(
+        "Successfully drained minion instance: " + instanceName);
+  }
+
   /**
    * Utility to perform a safety check of the operation to drop an instance.
    * If the resource is not safe to drop the utility lists all the possible 
reasons.
@@ -3627,6 +3668,52 @@ public class PinotHelixResourceManager {
     return response.setSafe(response.getIssues().isEmpty());
   }
 
+  /**
+   * Restores previous tags for a drained minion instance if applicable.
+   * When a minion is drained, its original tags are stored and replaced with 
minion_drained.
+   * This method restores those original tags when enabling the instance.
+   *
+   * @param instanceName: Name of the instance to check and restore tags for
+   * @return PinotResourceManagerResponse indicating failure if restoration 
fails, or null if no action is needed
+   */
+  private PinotResourceManagerResponse restoreMinionTagsIfDrained(String 
instanceName) {
+    if (!InstanceTypeUtils.isMinion(instanceName)) {
+      return null;
+    }
+
+    InstanceConfig instanceConfig = getHelixInstanceConfig(instanceName);
+    // we can skip the null check for instanceConfig because instance 
existence is already
+    // validated in enableInstance method before this method is called
+    List<String> currentTags = instanceConfig.getTags();
+    if (currentTags == null || 
!currentTags.contains(Helix.DRAINED_MINION_INSTANCE)) {
+      // Not a drained minion, no action needed
+      return null;
+    }
+
+    // Restore pre-drain tags
+    List<String> preDrainTags = 
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+    if (preDrainTags == null || preDrainTags.isEmpty()) {
+      // No previous tags to restore, just return null to continue with normal 
flow
+      instanceConfig.getRecord().setListField(
+          InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), new 
ArrayList<>());
+    } else {
+      instanceConfig.getRecord().setListField(
+          InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), new 
ArrayList<>(preDrainTags));
+      // Clear the stored pre-drain tags
+      instanceConfig.getRecord().getListFields().remove(Helix.PREVIOUS_TAGS);
+    }
+
+    // Save the updated config
+    if 
(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceName), 
instanceConfig)) {
+      return PinotResourceManagerResponse.failure(
+          "Failed to restore tags for minion instance: " + instanceName);
+    }
+
+    LOGGER.info("Successfully restored tags for minion instance: {}", 
instanceName);
+    // Return null to continue with normal enable flow
+    return null;
+  }
+
   /**
    * Toggle the status of an Instance between OFFLINE and ONLINE.
    * Keeps checking until ideal-state is successfully updated or times out.
@@ -3641,6 +3728,15 @@ public class PinotHelixResourceManager {
       return PinotResourceManagerResponse.failure("Instance " + instanceName + 
" not found");
     }
 
+    // If enabling a drained minion, restore its previous tags
+    if (enableInstance) {
+      PinotResourceManagerResponse restoreResponse = 
restoreMinionTagsIfDrained(instanceName);
+      if (restoreResponse != null) {
+        // Failed to restore tags for a drained minion
+        return restoreResponse;
+      }
+    }
+
     _helixAdmin.enableInstance(_helixClusterName, instanceName, 
enableInstance);
     long intervalWaitTimeMs = 500L;
     long deadline = System.currentTimeMillis() + timeOutMs;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 9b7bad09edf..2306f1fd0fa 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -296,6 +296,159 @@ public class PinotInstanceRestletResourceTest extends 
ControllerTest {
     return tags;
   }
 
+  @Test
+  public void testDrainMinionInstance()
+      throws Exception {
+    // Create a minion instance with minion_untagged tag
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
+    Instance minionInstance =
+        new Instance("minion1.test.com", 9514, InstanceType.MINION,
+            Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE),
+            null, 0, 0, 0, 0, false);
+    sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+    String minionInstanceId = "Minion_minion1.test.com_9514";
+
+    // Verify the minion was created with minion_untagged tag
+    checkInstanceInfo(minionInstanceId, "minion1.test.com", 9514, new 
String[]{Helix.UNTAGGED_MINION_INSTANCE},
+        null, -1, -1, -1, -1, false);
+
+    // Drain the minion instance
+    String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+    sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+    // Verify the minion now has minion_drained tag instead of minion_untagged
+    JsonNode response = 
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+    assertEquals(response.get("instanceName").asText(), minionInstanceId);
+    JsonNode tags = response.get("tags");
+    assertEquals(tags.size(), 1);
+    assertEquals(tags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup - delete the minion instance
+    sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+  }
+
+  @Test
+  public void testDrainMinionInstanceWithCustomTags()
+      throws Exception {
+    // Create a minion instance with custom tags - should succeed and replace 
ALL tags
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
+    List<String> tags = Arrays.asList(Helix.UNTAGGED_MINION_INSTANCE, 
"custom_tag1", "custom_tag2");
+    Instance minionInstance =
+        new Instance("minion2.test.com", 9515, InstanceType.MINION, tags, 
null, 0, 0, 0, 0, false);
+    sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+    String minionInstanceId = "Minion_minion2.test.com_9515";
+
+    // Drain the minion instance - should succeed
+    String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+    sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+    // Verify ALL tags were replaced with just minion_drained
+    JsonNode response = 
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+    JsonNode responseTags = response.get("tags");
+    assertEquals(responseTags.size(), 1);
+    assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+  }
+
+  @Test
+  public void testDrainMinionInstanceWithOnlyCustomTag()
+      throws Exception {
+    // Create a minion instance with only custom tag - should succeed and 
replace it
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
+    List<String> tags = Arrays.asList("custom_tag");
+    Instance minionInstance =
+        new Instance("minion3.test.com", 9516, InstanceType.MINION, tags, 
null, 0, 0, 0, 0, false);
+    sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+    String minionInstanceId = "Minion_minion3.test.com_9516";
+
+    // Drain the minion instance - should succeed
+    String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+    sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+    // Verify tag was replaced with minion_drained
+    JsonNode response = 
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+    JsonNode responseTags = response.get("tags");
+    assertEquals(responseTags.size(), 1);
+    assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+  }
+
+  @Test
+  public void testDrainMinionInstanceAlreadyDrainedFails()
+      throws Exception {
+    // Create a minion instance that's already drained - should fail to drain 
again
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
+    List<String> tags = Arrays.asList(Helix.DRAINED_MINION_INSTANCE);
+    Instance minionInstance =
+        new Instance("minion4.test.com", 9517, InstanceType.MINION, tags, 
null, 0, 0, 0, 0, false);
+    sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+    String minionInstanceId = "Minion_minion4.test.com_9517";
+
+    // Attempt to drain the already drained minion instance - should fail
+    String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+    assertThrows(IOException.class, () -> sendPutRequest(drainUrl + 
"?state=DRAIN", ""));
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+  }
+
+  @Test
+  public void testDrainNonMinionInstanceFails()
+      throws Exception {
+    // Try to drain a broker instance (should fail)
+    String brokerInstanceId = "Broker_localhost_1234";
+    Instance brokerInstance =
+        new Instance("localhost", 1234, InstanceType.BROKER, 
Collections.singletonList("broker_tag"), null, 0, 0, 0, 0,
+            false);
+    sendPostRequest(_urlBuilder.forInstanceCreate(), 
brokerInstance.toJsonString());
+
+    // Attempt to drain the broker - should fail with 400 Bad Request
+    String drainUrl = _urlBuilder.forInstanceState(brokerInstanceId);
+    assertThrows(IOException.class, () -> sendPutRequest(drainUrl + 
"?state=DRAIN", ""));
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(brokerInstanceId));
+  }
+
+  @Test
+  public void testDrainNonExistentMinionFails()
+      throws Exception {
+    // Try to drain a non-existent minion instance
+    String nonExistentMinionId = "Minion_nonexistent_9999";
+    String drainUrl = _urlBuilder.forInstanceState(nonExistentMinionId);
+
+    // Should fail with 404 Not Found
+    assertThrows(IOException.class, () -> sendPutRequest(drainUrl + 
"?state=DRAIN", ""));
+  }
+
+  @Test
+  public void testDrainMinionWithEmptyTags()
+      throws Exception {
+    // Create a minion instance with no tags
+    String createInstanceUrl = _urlBuilder.forInstanceCreate();
+    Instance minionInstance =
+        new Instance("minion5.test.com", 9518, InstanceType.MINION, null, 
null, 0, 0, 0, 0, false);
+    sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+    String minionInstanceId = "Minion_minion5.test.com_9518";
+
+    // Drain the minion instance
+    String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+    sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+    // Verify minion_drained tag was added
+    JsonNode response = 
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+    JsonNode responseTags = response.get("tags");
+    assertEquals(responseTags.size(), 1);
+    assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup
+    sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
new file mode 100644
index 00000000000..d2dd34ef8fd
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for minion drain functionality in PinotHelixResourceManager
+ */
+public class PinotHelixResourceManagerMinionDrainTest extends ControllerTest {
+  private List<HelixManager> _fakeMinionHelixManagers = new 
java.util.ArrayList<>();
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+  }
+
+  /**
+   * Creates a fake minion HelixManager that connects to the cluster, creating 
a LiveInstance entry.
+   * This is needed for enableInstance to work in tests.
+   */
+  private void createFakeMinionLiveInstance(String instanceId)
+      throws Exception {
+    HelixManager helixManager =
+        HelixManagerFactory.getZKHelixManager(getHelixClusterName(),
+            instanceId, org.apache.helix.InstanceType.PARTICIPANT, getZkUrl());
+    helixManager.connect();
+    _fakeMinionHelixManagers.add(helixManager);
+  }
+
+  @Test
+  public void testDrainMinionInstanceBasic() {
+    // Create a minion instance with minion_untagged tag
+    String minionHost = "minion-test-1.example.com";
+    int minionPort = 9514;
+    Instance minionInstance =
+        new Instance(minionHost, minionPort, InstanceType.MINION, 
Collections.singletonList(
+            Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful(), "Failed to add minion instance");
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Verify initial state has minion_untagged tag
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    assertNotNull(instanceConfig);
+    List<String> initialTags = instanceConfig.getTags();
+    assertEquals(initialTags.size(), 1);
+    assertTrue(initialTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+    // Drain the minion
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful(), "Failed to drain minion: " + 
drainResponse.getMessage());
+
+    // Verify tags were updated
+    instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> drainedTags = instanceConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertFalse(drainedTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+    assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainMinionInstanceWithCustomTags() {
+    // Create a minion with custom tags - should succeed and replace ALL tags
+    String minionHost = "minion-test-2.example.com";
+    int minionPort = 9515;
+    List<String> tags = Arrays.asList(Helix.UNTAGGED_MINION_INSTANCE, 
"custom_tag_1", "custom_tag_2");
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        tags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Drain the minion - should succeed
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify ALL tags were replaced with just minion_drained
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> drainedTags = instanceConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainMinionInstanceWithOnlyCustomTag() {
+    // Create a minion with only custom tag - should succeed and replace it
+    String minionHost = "minion-test-3.example.com";
+    int minionPort = 9516;
+    List<String> tags = Arrays.asList("custom_tag");
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        tags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Drain the minion - should succeed
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify tag was replaced with minion_drained
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> drainedTags = instanceConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainMinionInstanceAlreadyDrainedFails() {
+    // Create a minion that's already drained - should throw exception
+    String minionHost = "minion-test-4.example.com";
+    int minionPort = 9517;
+    List<String> tags = Arrays.asList(Helix.DRAINED_MINION_INSTANCE);
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        tags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    try {
+      // Attempt to drain the already drained minion - should throw 
UnsupportedOperationException
+      PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+      assertFalse(drainResponse.isSuccessful(), "Draining an already drained 
minion should have failed");
+    } finally {
+      // Cleanup
+      _helixResourceManager.dropInstance(minionInstanceId);
+    }
+  }
+
+  @Test
+  public void testDrainMinionWithEmptyTags() {
+    // Create a minion with no tags
+    String minionHost = "minion-test-5.example.com";
+    int minionPort = 9518;
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION, null, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Drain the minion
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify minion_drained tag was added
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> drainedTags = instanceConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainNonExistentMinionFails() {
+    // Try to drain a non-existent minion
+    String nonExistentMinionId = "Minion_nonexistent.example.com_9999";
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(nonExistentMinionId);
+    assertFalse(drainResponse.isSuccessful());
+    assertTrue(drainResponse.getMessage().contains("not found"));
+  }
+
+  @Test
+  public void testDrainMinionPreservesOtherInstanceConfigFields() {
+    // Create a minion with various config fields set
+    String minionHost = "minion-test-6.example.com";
+    int minionPort = 9519;
+    int grpcPort = 8090;
+    int adminPort = 8091;
+    Instance minionInstance =
+        new Instance(minionHost, minionPort, InstanceType.MINION, 
Collections.singletonList(
+            Helix.UNTAGGED_MINION_INSTANCE), null, grpcPort, adminPort, 0, 0, 
false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Verify initial config
+    InstanceConfig initialConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    assertEquals(initialConfig.getHostName(), minionHost);
+    assertEquals(Integer.parseInt(initialConfig.getPort()), minionPort);
+    
assertEquals(initialConfig.getRecord().getSimpleField(Helix.Instance.GRPC_PORT_KEY),
 String.valueOf(grpcPort));
+    
assertEquals(initialConfig.getRecord().getSimpleField(Helix.Instance.ADMIN_PORT_KEY),
 String.valueOf(adminPort));
+
+    // Drain the minion
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify other config fields remain unchanged
+    InstanceConfig drainedConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    assertEquals(drainedConfig.getHostName(), minionHost);
+    assertEquals(Integer.parseInt(drainedConfig.getPort()), minionPort);
+    
assertEquals(drainedConfig.getRecord().getSimpleField(Helix.Instance.GRPC_PORT_KEY),
 String.valueOf(grpcPort));
+    
assertEquals(drainedConfig.getRecord().getSimpleField(Helix.Instance.ADMIN_PORT_KEY),
 String.valueOf(adminPort));
+
+    // Verify only tags changed
+    List<String> drainedTags = drainedConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testMultipleDrainOperationsOnSameMinion() {
+    // Create a minion
+    String minionHost = "minion-test-7.example.com";
+    int minionPort = 9520;
+    Instance minionInstance =
+        new Instance(minionHost, minionPort, InstanceType.MINION, 
Collections.singletonList(
+            Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // First drain should succeed
+    PinotResourceManagerResponse firstDrainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(firstDrainResponse.isSuccessful(), "First drain operation 
failed");
+
+    // Verify state after first drain
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> tags = instanceConfig.getTags();
+    assertEquals(tags.size(), 1);
+    assertTrue(tags.contains(Helix.DRAINED_MINION_INSTANCE));
+    assertFalse(tags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+    // Subsequent drain attempts should throw UnsupportedOperationException 
(already drained)
+    for (int i = 1; i < 3; i++) {
+        PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+        assertFalse(drainResponse.isSuccessful(), "Subsequent drain operation 
" + (i + 1) + " should have failed");
+    }
+
+    // Verify final state remains unchanged
+    instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    tags = instanceConfig.getTags();
+    assertEquals(tags.size(), 1);
+    assertTrue(tags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainMinionInstanceConcurrentSafety()
+      throws InterruptedException {
+    // Create a minion with only standard tag
+    String minionHost = "minion-test-8.example.com";
+    int minionPort = 9521;
+    List<String> tags = 
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION, tags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Drain from multiple threads concurrently
+    Thread[] threads = new Thread[5];
+    final boolean[] results = new boolean[5];
+
+    for (int i = 0; i < 5; i++) {
+      final int index = i;
+      threads[i] = new Thread(() -> {
+        try {
+          PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+          results[index] = drainResponse.isSuccessful();
+        } catch (UnsupportedOperationException e) {
+          // Expected after first drain succeeds (minion will have 
minion_drained tag)
+          results[index] = false;
+        }
+      });
+      threads[i].start();
+    }
+
+    // Wait for all threads to complete
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    // At least one operation should succeed, others may throw exception after 
first succeeds
+    // (because minion will have minion_drained tag which is considered custom)
+    int successCount = 0;
+    for (int i = 0; i < 5; i++) {
+      if (results[i]) {
+        successCount++;
+      }
+    }
+    assertTrue(successCount >= 1, "At least one concurrent drain operation 
should succeed");
+
+    // Verify final state - should have minion_drained tag
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> finalTags = instanceConfig.getTags();
+    assertEquals(finalTags.size(), 1);
+    assertTrue(finalTags.contains(Helix.DRAINED_MINION_INSTANCE));
+    assertFalse(finalTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainThenEnableRestoresOriginalTags() throws Exception {
+    // Create a minion with minion_untagged tag
+    String minionHost = "minion-test-9.example.com";
+    int minionPort = 9522;
+    List<String> originalTags = 
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        originalTags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Create a fake minion live instance (simulate minion joining) - needed 
for enableInstance to work
+    createFakeMinionLiveInstance(minionInstanceId);
+
+    // Drain the minion
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify it's drained
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> drainedTags = instanceConfig.getTags();
+    assertEquals(drainedTags.size(), 1);
+    assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+    // Verify pre-drain tags were stored
+    List<String> storedPreDrainTags = 
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+    assertNotNull(storedPreDrainTags);
+    assertEquals(storedPreDrainTags, originalTags);
+
+    // Enable the minion - should restore original tags
+    PinotResourceManagerResponse enableResponse = 
_helixResourceManager.enableInstance(minionInstanceId);
+    assertTrue(enableResponse.isSuccessful());
+
+    // Verify tags were restored
+    instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> restoredTags = instanceConfig.getTags();
+    assertEquals(restoredTags.size(), 1);
+    assertEquals(restoredTags.get(0), Helix.UNTAGGED_MINION_INSTANCE);
+
+    // Verify pre-drain tags were cleared
+    List<String> clearedPreDrainTags = 
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+    assertTrue(clearedPreDrainTags == null || clearedPreDrainTags.isEmpty());
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testDrainThenEnableRestoresCustomTags() throws Exception {
+    // Create a minion with custom tags
+    String minionHost = "minion-test-10.example.com";
+    int minionPort = 9523;
+    List<String> originalTags = Arrays.asList("minion_partition_A", 
"custom_tag");
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        originalTags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Create a fake minion live instance (simulate minion joining) - needed 
for enableInstance to work
+    createFakeMinionLiveInstance(minionInstanceId);
+
+    // Drain the minion
+    PinotResourceManagerResponse drainResponse = 
_helixResourceManager.drainMinionInstance(minionInstanceId);
+    assertTrue(drainResponse.isSuccessful());
+
+    // Verify it's drained
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    assertEquals(instanceConfig.getTags().size(), 1);
+    assertEquals(instanceConfig.getTags().get(0), 
Helix.DRAINED_MINION_INSTANCE);
+
+    // Enable the minion - should restore custom tags
+    PinotResourceManagerResponse enableResponse = 
_helixResourceManager.enableInstance(minionInstanceId);
+    assertTrue(enableResponse.isSuccessful());
+
+    // Verify custom tags were restored
+    instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> restoredTags = instanceConfig.getTags();
+    assertEquals(restoredTags.size(), 2);
+    assertTrue(restoredTags.contains("minion_partition_A"));
+    assertTrue(restoredTags.contains("custom_tag"));
+    assertFalse(restoredTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @Test
+  public void testEnableNonDrainedMinionDoesNotModifyTags() throws Exception {
+    // Create a normal minion
+    String minionHost = "minion-test-11.example.com";
+    int minionPort = 9524;
+    List<String> originalTags = 
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+    Instance minionInstance = new Instance(minionHost, minionPort, 
InstanceType.MINION,
+        originalTags, null, 0, 0, 0, 0, false);
+
+    PinotResourceManagerResponse addResponse = 
_helixResourceManager.addInstance(minionInstance, false);
+    assertTrue(addResponse.isSuccessful());
+
+    String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+    // Create a fake minion live instance (simulate minion joining) - needed 
for enableInstance to work
+    createFakeMinionLiveInstance(minionInstanceId);
+
+    // Enable the minion (not drained) - tags should remain unchanged
+    PinotResourceManagerResponse enableResponse = 
_helixResourceManager.enableInstance(minionInstanceId);
+    assertTrue(enableResponse.isSuccessful());
+
+    // Verify tags were not modified
+    InstanceConfig instanceConfig = 
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+    List<String> tags = instanceConfig.getTags();
+    assertEquals(tags.size(), 1);
+    assertEquals(tags.get(0), Helix.UNTAGGED_MINION_INSTANCE);
+
+    // Cleanup
+    _helixResourceManager.dropInstance(minionInstanceId);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    // Disconnect fake minion managers
+    for (HelixManager manager : _fakeMinionHelixManagers) {
+      if (manager != null && manager.isConnected()) {
+        manager.disconnect();
+      }
+    }
+    _fakeMinionHelixManagers.clear();
+    stopController();
+    stopZk();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 69862016e8c..2779827ab1c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -159,6 +159,9 @@ public class CommonConstants {
     public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
     public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
     public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
+    public static final String DRAINED_MINION_INSTANCE = "minion_drained";
+
+    public static final String PREVIOUS_TAGS = "previousTags";
 
     public static class StateModel {
       public static class SegmentStateModel {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to