This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d33f32763e8 Add Support to tag a minion as Drained (#17375)
d33f32763e8 is described below
commit d33f32763e80e0fc6c2d1596e4162e1e4584d019
Author: Krishna Koganti <[email protected]>
AuthorDate: Wed Jan 7 04:48:32 2026 -0800
Add Support to tag a minion as Drained (#17375)
* Add Support to tag a minion as Drained
* Dont use * in imports
* Address comments
* Address comments
* Add funcitonality to restore previous tags on a drained minion
* Address comments
---
.../resources/PinotInstanceRestletResource.java | 22 +-
.../pinot/controller/api/resources/StateType.java | 2 +-
.../helix/core/PinotHelixResourceManager.java | 96 ++++
.../api/PinotInstanceRestletResourceTest.java | 153 +++++++
.../PinotHelixResourceManagerMinionDrainTest.java | 495 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
6 files changed, 767 insertions(+), 4 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index cdd8bfdad7c..80babcb368f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -66,6 +66,7 @@ import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -256,7 +257,9 @@ public class PinotInstanceRestletResource {
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.TEXT_PLAIN)
- @ApiOperation(value = "Enable/disable an instance", notes = "Enable/disable
an instance")
+ @ApiOperation(value = "Enable/disable/drain an instance", notes =
"Enable/disable/drain an instance. "
+ + "DRAIN state is only applicable to minion instances and retags them
from minion_untagged to minion_drained, "
+ + "preventing new task assignments while allowing existing tasks to
complete.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 400, message = "Bad Request"),
@@ -264,9 +267,10 @@ public class PinotInstanceRestletResource {
@ApiResponse(code = 500, message = "Internal error")
})
public SuccessResponse toggleInstanceState(
- @ApiParam(value = "Instance name", required = true, example =
"Server_a.b.com_20000 | Broker_my.broker.com_30000")
+ @ApiParam(value = "Instance name", required = true, example =
"Server_a.b.com_20000 | Broker_my.broker.com_30000 "
+ + "| Minion_hostname_9514")
@PathParam("instanceName") String instanceName,
- @ApiParam(value = "enable|disable", required = true)
@QueryParam("state") String state) {
+ @ApiParam(value = "enable|disable|drain", required = true)
@QueryParam("state") String state) {
if (!_pinotHelixResourceManager.instanceExists(instanceName)) {
throw new ControllerApplicationException(LOGGER, "Instance '" +
instanceName + "' does not exist",
Response.Status.NOT_FOUND);
@@ -286,6 +290,18 @@ public class PinotInstanceRestletResource {
"Failed to disable instance '" + instanceName + "': " +
response.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
}
+ } else if (StateType.DRAIN.name().equalsIgnoreCase(state)) {
+ if (!InstanceTypeUtils.isMinion(instanceName)) {
+ throw new ControllerApplicationException(LOGGER,
+ "DRAIN state only applies to minion instances. Instance '" +
instanceName + "' is not a minion.",
+ Response.Status.BAD_REQUEST);
+ }
+ PinotResourceManagerResponse response =
_pinotHelixResourceManager.drainMinionInstance(instanceName);
+ if (!response.isSuccessful()) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to drain minion instance '" + instanceName + "': " +
response.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
} else {
throw new ControllerApplicationException(LOGGER, "Unknown state '" +
state + "'", Response.Status.BAD_REQUEST);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
index e61834cf51f..879b49e368c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/StateType.java
@@ -19,5 +19,5 @@
package org.apache.pinot.controller.api.resources;
public enum StateType {
- ENABLE, DISABLE, DROP
+ ENABLE, DISABLE, DROP, DRAIN
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f02c6376e9d..ef65f6abcfb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3602,6 +3602,47 @@ public class PinotHelixResourceManager {
return PinotResourceManagerResponse.success("Instance " + instanceName + "
dropped");
}
+ /**
+ * Drains a minion instance by preventing new task assignments while
allowing existing tasks to complete.
+ * This is achieved by replacing all instance tags with minion_drained.
Since Helix uses containsTag()
+ * for task assignment matching, keeping any existing tags would still allow
task assignments.
+ *
+ * @param instanceName Name of the minion instance to drain
+ * @return Response indicating success or failure
+ * @throws UnsupportedOperationException if the minion is already drained
+ */
+ public synchronized PinotResourceManagerResponse drainMinionInstance(String
instanceName) {
+ InstanceConfig instanceConfig = getHelixInstanceConfig(instanceName);
+ if (instanceConfig == null) {
+ return PinotResourceManagerResponse.failure("Instance " + instanceName +
" not found");
+ }
+
+ // Validate that minion is not already drained
+ List<String> currentTags = instanceConfig.getTags();
+ if (currentTags != null &&
currentTags.contains(Helix.DRAINED_MINION_INSTANCE)) {
+ return PinotResourceManagerResponse.failure("Minion instance " +
instanceName + " is already drained");
+ }
+
+ // Store original tags so they can be restored when enabling the minion
+ if (currentTags != null && !currentTags.isEmpty()) {
+ instanceConfig.getRecord().setListField(Helix.PREVIOUS_TAGS, new
ArrayList<>(currentTags));
+ }
+
+ // Replace all tags with minion_drained to prevent any task assignments
+ List<String> updatedTags =
Collections.singletonList(Helix.DRAINED_MINION_INSTANCE);
+ instanceConfig.getRecord().setListField(
+ InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), updatedTags);
+
+ // Save to Helix
+ if
(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceName),
instanceConfig)) {
+ return PinotResourceManagerResponse.failure("Failed to set instance
config for instance: " + instanceName);
+ }
+
+ LOGGER.info("Successfully drained minion instance: {}", instanceName);
+ return PinotResourceManagerResponse.success(
+ "Successfully drained minion instance: " + instanceName);
+ }
+
/**
* Utility to perform a safety check of the operation to drop an instance.
* If the resource is not safe to drop the utility lists all the possible
reasons.
@@ -3627,6 +3668,52 @@ public class PinotHelixResourceManager {
return response.setSafe(response.getIssues().isEmpty());
}
+ /**
+ * Restores previous tags for a drained minion instance if applicable.
+ * When a minion is drained, its original tags are stored and replaced with
minion_drained.
+ * This method restores those original tags when enabling the instance.
+ *
+ * @param instanceName: Name of the instance to check and restore tags for
+ * @return PinotResourceManagerResponse indicating failure if restoration
fails, or null if no action is needed
+ */
+ private PinotResourceManagerResponse restoreMinionTagsIfDrained(String
instanceName) {
+ if (!InstanceTypeUtils.isMinion(instanceName)) {
+ return null;
+ }
+
+ InstanceConfig instanceConfig = getHelixInstanceConfig(instanceName);
+ // we can skip the null check for instanceConfig because instance
existence is already
+ // validated in enableInstance method before this method is called
+ List<String> currentTags = instanceConfig.getTags();
+ if (currentTags == null ||
!currentTags.contains(Helix.DRAINED_MINION_INSTANCE)) {
+ // Not a drained minion, no action needed
+ return null;
+ }
+
+ // Restore pre-drain tags
+ List<String> preDrainTags =
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+ if (preDrainTags == null || preDrainTags.isEmpty()) {
+ // No previous tags to restore, just return null to continue with normal
flow
+ instanceConfig.getRecord().setListField(
+ InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), new
ArrayList<>());
+ } else {
+ instanceConfig.getRecord().setListField(
+ InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), new
ArrayList<>(preDrainTags));
+ // Clear the stored pre-drain tags
+ instanceConfig.getRecord().getListFields().remove(Helix.PREVIOUS_TAGS);
+ }
+
+ // Save the updated config
+ if
(!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceName),
instanceConfig)) {
+ return PinotResourceManagerResponse.failure(
+ "Failed to restore tags for minion instance: " + instanceName);
+ }
+
+ LOGGER.info("Successfully restored tags for minion instance: {}",
instanceName);
+ // Return null to continue with normal enable flow
+ return null;
+ }
+
/**
* Toggle the status of an Instance between OFFLINE and ONLINE.
* Keeps checking until ideal-state is successfully updated or times out.
@@ -3641,6 +3728,15 @@ public class PinotHelixResourceManager {
return PinotResourceManagerResponse.failure("Instance " + instanceName +
" not found");
}
+ // If enabling a drained minion, restore its previous tags
+ if (enableInstance) {
+ PinotResourceManagerResponse restoreResponse =
restoreMinionTagsIfDrained(instanceName);
+ if (restoreResponse != null) {
+ // Failed to restore tags for a drained minion
+ return restoreResponse;
+ }
+ }
+
_helixAdmin.enableInstance(_helixClusterName, instanceName,
enableInstance);
long intervalWaitTimeMs = 500L;
long deadline = System.currentTimeMillis() + timeOutMs;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index 9b7bad09edf..2306f1fd0fa 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -296,6 +296,159 @@ public class PinotInstanceRestletResourceTest extends
ControllerTest {
return tags;
}
+ @Test
+ public void testDrainMinionInstance()
+ throws Exception {
+ // Create a minion instance with minion_untagged tag
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
+ Instance minionInstance =
+ new Instance("minion1.test.com", 9514, InstanceType.MINION,
+ Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE),
+ null, 0, 0, 0, 0, false);
+ sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+ String minionInstanceId = "Minion_minion1.test.com_9514";
+
+ // Verify the minion was created with minion_untagged tag
+ checkInstanceInfo(minionInstanceId, "minion1.test.com", 9514, new
String[]{Helix.UNTAGGED_MINION_INSTANCE},
+ null, -1, -1, -1, -1, false);
+
+ // Drain the minion instance
+ String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+ sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+ // Verify the minion now has minion_drained tag instead of minion_untagged
+ JsonNode response =
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+ assertEquals(response.get("instanceName").asText(), minionInstanceId);
+ JsonNode tags = response.get("tags");
+ assertEquals(tags.size(), 1);
+ assertEquals(tags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup - delete the minion instance
+ sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+ }
+
+ @Test
+ public void testDrainMinionInstanceWithCustomTags()
+ throws Exception {
+ // Create a minion instance with custom tags - should succeed and replace
ALL tags
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
+ List<String> tags = Arrays.asList(Helix.UNTAGGED_MINION_INSTANCE,
"custom_tag1", "custom_tag2");
+ Instance minionInstance =
+ new Instance("minion2.test.com", 9515, InstanceType.MINION, tags,
null, 0, 0, 0, 0, false);
+ sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+ String minionInstanceId = "Minion_minion2.test.com_9515";
+
+ // Drain the minion instance - should succeed
+ String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+ sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+ // Verify ALL tags were replaced with just minion_drained
+ JsonNode response =
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+ JsonNode responseTags = response.get("tags");
+ assertEquals(responseTags.size(), 1);
+ assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+ }
+
+ @Test
+ public void testDrainMinionInstanceWithOnlyCustomTag()
+ throws Exception {
+ // Create a minion instance with only custom tag - should succeed and
replace it
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
+ List<String> tags = Arrays.asList("custom_tag");
+ Instance minionInstance =
+ new Instance("minion3.test.com", 9516, InstanceType.MINION, tags,
null, 0, 0, 0, 0, false);
+ sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+ String minionInstanceId = "Minion_minion3.test.com_9516";
+
+ // Drain the minion instance - should succeed
+ String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+ sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+ // Verify tag was replaced with minion_drained
+ JsonNode response =
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+ JsonNode responseTags = response.get("tags");
+ assertEquals(responseTags.size(), 1);
+ assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+ }
+
+ @Test
+ public void testDrainMinionInstanceAlreadyDrainedFails()
+ throws Exception {
+ // Create a minion instance that's already drained - should fail to drain
again
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
+ List<String> tags = Arrays.asList(Helix.DRAINED_MINION_INSTANCE);
+ Instance minionInstance =
+ new Instance("minion4.test.com", 9517, InstanceType.MINION, tags,
null, 0, 0, 0, 0, false);
+ sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+ String minionInstanceId = "Minion_minion4.test.com_9517";
+
+ // Attempt to drain the already drained minion instance - should fail
+ String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+ assertThrows(IOException.class, () -> sendPutRequest(drainUrl +
"?state=DRAIN", ""));
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+ }
+
+ @Test
+ public void testDrainNonMinionInstanceFails()
+ throws Exception {
+ // Try to drain a broker instance (should fail)
+ String brokerInstanceId = "Broker_localhost_1234";
+ Instance brokerInstance =
+ new Instance("localhost", 1234, InstanceType.BROKER,
Collections.singletonList("broker_tag"), null, 0, 0, 0, 0,
+ false);
+ sendPostRequest(_urlBuilder.forInstanceCreate(),
brokerInstance.toJsonString());
+
+ // Attempt to drain the broker - should fail with 400 Bad Request
+ String drainUrl = _urlBuilder.forInstanceState(brokerInstanceId);
+ assertThrows(IOException.class, () -> sendPutRequest(drainUrl +
"?state=DRAIN", ""));
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(brokerInstanceId));
+ }
+
+ @Test
+ public void testDrainNonExistentMinionFails()
+ throws Exception {
+ // Try to drain a non-existent minion instance
+ String nonExistentMinionId = "Minion_nonexistent_9999";
+ String drainUrl = _urlBuilder.forInstanceState(nonExistentMinionId);
+
+ // Should fail with 404 Not Found
+ assertThrows(IOException.class, () -> sendPutRequest(drainUrl +
"?state=DRAIN", ""));
+ }
+
+ @Test
+ public void testDrainMinionWithEmptyTags()
+ throws Exception {
+ // Create a minion instance with no tags
+ String createInstanceUrl = _urlBuilder.forInstanceCreate();
+ Instance minionInstance =
+ new Instance("minion5.test.com", 9518, InstanceType.MINION, null,
null, 0, 0, 0, 0, false);
+ sendPostRequest(createInstanceUrl, minionInstance.toJsonString());
+ String minionInstanceId = "Minion_minion5.test.com_9518";
+
+ // Drain the minion instance
+ String drainUrl = _urlBuilder.forInstanceState(minionInstanceId);
+ sendPutRequest(drainUrl + "?state=DRAIN", "");
+
+ // Verify minion_drained tag was added
+ JsonNode response =
JsonUtils.stringToJsonNode(sendGetRequest(_urlBuilder.forInstance(minionInstanceId)));
+ JsonNode responseTags = response.get("tags");
+ assertEquals(responseTags.size(), 1);
+ assertEquals(responseTags.get(0).asText(), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup
+ sendDeleteRequest(_urlBuilder.forInstance(minionInstanceId));
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
new file mode 100644
index 00000000000..d2dd34ef8fd
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerMinionDrainTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for minion drain functionality in PinotHelixResourceManager
+ */
+public class PinotHelixResourceManagerMinionDrainTest extends ControllerTest {
+ private List<HelixManager> _fakeMinionHelixManagers = new
java.util.ArrayList<>();
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+ startController();
+ }
+
+ /**
+ * Creates a fake minion HelixManager that connects to the cluster, creating
a LiveInstance entry.
+ * This is needed for enableInstance to work in tests.
+ */
+ private void createFakeMinionLiveInstance(String instanceId)
+ throws Exception {
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(getHelixClusterName(),
+ instanceId, org.apache.helix.InstanceType.PARTICIPANT, getZkUrl());
+ helixManager.connect();
+ _fakeMinionHelixManagers.add(helixManager);
+ }
+
+ @Test
+ public void testDrainMinionInstanceBasic() {
+ // Create a minion instance with minion_untagged tag
+ String minionHost = "minion-test-1.example.com";
+ int minionPort = 9514;
+ Instance minionInstance =
+ new Instance(minionHost, minionPort, InstanceType.MINION,
Collections.singletonList(
+ Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful(), "Failed to add minion instance");
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Verify initial state has minion_untagged tag
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ assertNotNull(instanceConfig);
+ List<String> initialTags = instanceConfig.getTags();
+ assertEquals(initialTags.size(), 1);
+ assertTrue(initialTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+ // Drain the minion
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful(), "Failed to drain minion: " +
drainResponse.getMessage());
+
+ // Verify tags were updated
+ instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> drainedTags = instanceConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertFalse(drainedTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+ assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainMinionInstanceWithCustomTags() {
+ // Create a minion with custom tags - should succeed and replace ALL tags
+ String minionHost = "minion-test-2.example.com";
+ int minionPort = 9515;
+ List<String> tags = Arrays.asList(Helix.UNTAGGED_MINION_INSTANCE,
"custom_tag_1", "custom_tag_2");
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ tags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Drain the minion - should succeed
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify ALL tags were replaced with just minion_drained
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> drainedTags = instanceConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainMinionInstanceWithOnlyCustomTag() {
+ // Create a minion with only custom tag - should succeed and replace it
+ String minionHost = "minion-test-3.example.com";
+ int minionPort = 9516;
+ List<String> tags = Arrays.asList("custom_tag");
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ tags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Drain the minion - should succeed
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify tag was replaced with minion_drained
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> drainedTags = instanceConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainMinionInstanceAlreadyDrainedFails() {
+ // Create a minion that's already drained - should throw exception
+ String minionHost = "minion-test-4.example.com";
+ int minionPort = 9517;
+ List<String> tags = Arrays.asList(Helix.DRAINED_MINION_INSTANCE);
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ tags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ try {
+ // Attempt to drain the already drained minion - should throw
UnsupportedOperationException
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertFalse(drainResponse.isSuccessful(), "Draining an already drained
minion should have failed");
+ } finally {
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+ }
+
+ @Test
+ public void testDrainMinionWithEmptyTags() {
+ // Create a minion with no tags
+ String minionHost = "minion-test-5.example.com";
+ int minionPort = 9518;
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION, null, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Drain the minion
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify minion_drained tag was added
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> drainedTags = instanceConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainNonExistentMinionFails() {
+ // Try to drain a non-existent minion
+ String nonExistentMinionId = "Minion_nonexistent.example.com_9999";
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(nonExistentMinionId);
+ assertFalse(drainResponse.isSuccessful());
+ assertTrue(drainResponse.getMessage().contains("not found"));
+ }
+
+ @Test
+ public void testDrainMinionPreservesOtherInstanceConfigFields() {
+ // Create a minion with various config fields set
+ String minionHost = "minion-test-6.example.com";
+ int minionPort = 9519;
+ int grpcPort = 8090;
+ int adminPort = 8091;
+ Instance minionInstance =
+ new Instance(minionHost, minionPort, InstanceType.MINION,
Collections.singletonList(
+ Helix.UNTAGGED_MINION_INSTANCE), null, grpcPort, adminPort, 0, 0,
false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Verify initial config
+ InstanceConfig initialConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ assertEquals(initialConfig.getHostName(), minionHost);
+ assertEquals(Integer.parseInt(initialConfig.getPort()), minionPort);
+
assertEquals(initialConfig.getRecord().getSimpleField(Helix.Instance.GRPC_PORT_KEY),
String.valueOf(grpcPort));
+
assertEquals(initialConfig.getRecord().getSimpleField(Helix.Instance.ADMIN_PORT_KEY),
String.valueOf(adminPort));
+
+ // Drain the minion
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify other config fields remain unchanged
+ InstanceConfig drainedConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ assertEquals(drainedConfig.getHostName(), minionHost);
+ assertEquals(Integer.parseInt(drainedConfig.getPort()), minionPort);
+
assertEquals(drainedConfig.getRecord().getSimpleField(Helix.Instance.GRPC_PORT_KEY),
String.valueOf(grpcPort));
+
assertEquals(drainedConfig.getRecord().getSimpleField(Helix.Instance.ADMIN_PORT_KEY),
String.valueOf(adminPort));
+
+ // Verify only tags changed
+ List<String> drainedTags = drainedConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertTrue(drainedTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testMultipleDrainOperationsOnSameMinion() {
+ // Create a minion
+ String minionHost = "minion-test-7.example.com";
+ int minionPort = 9520;
+ Instance minionInstance =
+ new Instance(minionHost, minionPort, InstanceType.MINION,
Collections.singletonList(
+ Helix.UNTAGGED_MINION_INSTANCE), null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // First drain should succeed
+ PinotResourceManagerResponse firstDrainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(firstDrainResponse.isSuccessful(), "First drain operation
failed");
+
+ // Verify state after first drain
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> tags = instanceConfig.getTags();
+ assertEquals(tags.size(), 1);
+ assertTrue(tags.contains(Helix.DRAINED_MINION_INSTANCE));
+ assertFalse(tags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+ // Subsequent drain attempts should throw UnsupportedOperationException
(already drained)
+ for (int i = 1; i < 3; i++) {
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertFalse(drainResponse.isSuccessful(), "Subsequent drain operation
" + (i + 1) + " should have failed");
+ }
+
+ // Verify final state remains unchanged
+ instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ tags = instanceConfig.getTags();
+ assertEquals(tags.size(), 1);
+ assertTrue(tags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainMinionInstanceConcurrentSafety()
+ throws InterruptedException {
+ // Create a minion with only standard tag
+ String minionHost = "minion-test-8.example.com";
+ int minionPort = 9521;
+ List<String> tags =
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION, tags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Drain from multiple threads concurrently
+ Thread[] threads = new Thread[5];
+ final boolean[] results = new boolean[5];
+
+ for (int i = 0; i < 5; i++) {
+ final int index = i;
+ threads[i] = new Thread(() -> {
+ try {
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ results[index] = drainResponse.isSuccessful();
+ } catch (UnsupportedOperationException e) {
+ // Expected after first drain succeeds (minion will have
minion_drained tag)
+ results[index] = false;
+ }
+ });
+ threads[i].start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // At least one operation should succeed, others may throw exception after
first succeeds
+ // (because minion will have minion_drained tag which is considered custom)
+ int successCount = 0;
+ for (int i = 0; i < 5; i++) {
+ if (results[i]) {
+ successCount++;
+ }
+ }
+ assertTrue(successCount >= 1, "At least one concurrent drain operation
should succeed");
+
+ // Verify final state - should have minion_drained tag
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> finalTags = instanceConfig.getTags();
+ assertEquals(finalTags.size(), 1);
+ assertTrue(finalTags.contains(Helix.DRAINED_MINION_INSTANCE));
+ assertFalse(finalTags.contains(Helix.UNTAGGED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainThenEnableRestoresOriginalTags() throws Exception {
+ // Create a minion with minion_untagged tag
+ String minionHost = "minion-test-9.example.com";
+ int minionPort = 9522;
+ List<String> originalTags =
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ originalTags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Create a fake minion live instance (simulate minion joining) - needed
for enableInstance to work
+ createFakeMinionLiveInstance(minionInstanceId);
+
+ // Drain the minion
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify it's drained
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> drainedTags = instanceConfig.getTags();
+ assertEquals(drainedTags.size(), 1);
+ assertEquals(drainedTags.get(0), Helix.DRAINED_MINION_INSTANCE);
+
+ // Verify pre-drain tags were stored
+ List<String> storedPreDrainTags =
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+ assertNotNull(storedPreDrainTags);
+ assertEquals(storedPreDrainTags, originalTags);
+
+ // Enable the minion - should restore original tags
+ PinotResourceManagerResponse enableResponse =
_helixResourceManager.enableInstance(minionInstanceId);
+ assertTrue(enableResponse.isSuccessful());
+
+ // Verify tags were restored
+ instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> restoredTags = instanceConfig.getTags();
+ assertEquals(restoredTags.size(), 1);
+ assertEquals(restoredTags.get(0), Helix.UNTAGGED_MINION_INSTANCE);
+
+ // Verify pre-drain tags were cleared
+ List<String> clearedPreDrainTags =
instanceConfig.getRecord().getListField(Helix.PREVIOUS_TAGS);
+ assertTrue(clearedPreDrainTags == null || clearedPreDrainTags.isEmpty());
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testDrainThenEnableRestoresCustomTags() throws Exception {
+ // Create a minion with custom tags
+ String minionHost = "minion-test-10.example.com";
+ int minionPort = 9523;
+ List<String> originalTags = Arrays.asList("minion_partition_A",
"custom_tag");
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ originalTags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Create a fake minion live instance (simulate minion joining) - needed
for enableInstance to work
+ createFakeMinionLiveInstance(minionInstanceId);
+
+ // Drain the minion
+ PinotResourceManagerResponse drainResponse =
_helixResourceManager.drainMinionInstance(minionInstanceId);
+ assertTrue(drainResponse.isSuccessful());
+
+ // Verify it's drained
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ assertEquals(instanceConfig.getTags().size(), 1);
+ assertEquals(instanceConfig.getTags().get(0),
Helix.DRAINED_MINION_INSTANCE);
+
+ // Enable the minion - should restore custom tags
+ PinotResourceManagerResponse enableResponse =
_helixResourceManager.enableInstance(minionInstanceId);
+ assertTrue(enableResponse.isSuccessful());
+
+ // Verify custom tags were restored
+ instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> restoredTags = instanceConfig.getTags();
+ assertEquals(restoredTags.size(), 2);
+ assertTrue(restoredTags.contains("minion_partition_A"));
+ assertTrue(restoredTags.contains("custom_tag"));
+ assertFalse(restoredTags.contains(Helix.DRAINED_MINION_INSTANCE));
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @Test
+ public void testEnableNonDrainedMinionDoesNotModifyTags() throws Exception {
+ // Create a normal minion
+ String minionHost = "minion-test-11.example.com";
+ int minionPort = 9524;
+ List<String> originalTags =
Collections.singletonList(Helix.UNTAGGED_MINION_INSTANCE);
+ Instance minionInstance = new Instance(minionHost, minionPort,
InstanceType.MINION,
+ originalTags, null, 0, 0, 0, 0, false);
+
+ PinotResourceManagerResponse addResponse =
_helixResourceManager.addInstance(minionInstance, false);
+ assertTrue(addResponse.isSuccessful());
+
+ String minionInstanceId = "Minion_" + minionHost + "_" + minionPort;
+
+ // Create a fake minion live instance (simulate minion joining) - needed
for enableInstance to work
+ createFakeMinionLiveInstance(minionInstanceId);
+
+ // Enable the minion (not drained) - tags should remain unchanged
+ PinotResourceManagerResponse enableResponse =
_helixResourceManager.enableInstance(minionInstanceId);
+ assertTrue(enableResponse.isSuccessful());
+
+ // Verify tags were not modified
+ InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(minionInstanceId);
+ List<String> tags = instanceConfig.getTags();
+ assertEquals(tags.size(), 1);
+ assertEquals(tags.get(0), Helix.UNTAGGED_MINION_INSTANCE);
+
+ // Cleanup
+ _helixResourceManager.dropInstance(minionInstanceId);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ // Disconnect fake minion managers
+ for (HelixManager manager : _fakeMinionHelixManagers) {
+ if (manager != null && manager.isConnected()) {
+ manager.disconnect();
+ }
+ }
+ _fakeMinionHelixManagers.clear();
+ stopController();
+ stopZk();
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 69862016e8c..2779827ab1c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -159,6 +159,9 @@ public class CommonConstants {
public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
+ public static final String DRAINED_MINION_INSTANCE = "minion_drained";
+
+ public static final String PREVIOUS_TAGS = "previousTags";
public static class StateModel {
public static class SegmentStateModel {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]