This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d206f127f3 Allowing users to pass minionInstanceTag as a param in
/tasks/schedule API (#12786)
d206f127f3 is described below
commit d206f127f33f2843c288b483e16e22963dac6b4f
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Thu Apr 18 03:44:20 2024 +0530
Allowing users to pass minionInstanceTag as a param in /tasks/schedule API
(#12786)
* Allowing users to pass minionInstanceTag in tasks/schedule API
* add nullable annotation
---
.../api/resources/PinotTaskRestletResource.java | 11 +++--
.../helix/core/minion/PinotTaskManager.java | 48 ++++++++++++++--------
.../tests/SimpleMinionClusterIntegrationTest.java | 4 +-
3 files changed, 40 insertions(+), 23 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index c51c266587..e09bde8466 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,19 +618,22 @@ public class PinotTaskRestletResource {
@ApiOperation("Schedule tasks and return a map from task type to task name
scheduled")
public Map<String, String> scheduleTasks(@ApiParam(value = "Task type")
@QueryParam("taskType") String taskType,
@ApiParam(value = "Table name (with type suffix)")
@QueryParam("tableName") String tableName,
+ @ApiParam(value = "Minion Instance tag to schedule the task explicitly
on")
+ @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
@Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) :
DEFAULT_DATABASE;
if (taskType != null) {
// Schedule task for the given task type
List<String> taskNames = tableName != null
- ? _pinotTaskManager.scheduleTask(taskType,
DatabaseUtils.translateTableName(tableName, headers))
- : _pinotTaskManager.scheduleTaskForDatabase(taskType, database);
+ ? _pinotTaskManager.scheduleTask(taskType,
+ DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
+ : _pinotTaskManager.scheduleTaskForDatabase(taskType, database,
minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null :
StringUtils.join(taskNames, ','));
} else {
// Schedule tasks for all task types
Map<String, List<String>> allTaskNames = tableName != null
- ?
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName,
headers))
- : _pinotTaskManager.scheduleTasksForDatabase(database);
+ ?
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName,
headers), minionInstanceTag)
+ : _pinotTaskManager.scheduleTasksForDatabase(database,
minionInstanceTag);
return allTaskNames.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
String.join(",", entry.getValue())));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 2cdbf8c1df..4029944139 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -485,7 +485,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasks() {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
+ return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false,
null);
}
/**
@@ -493,15 +493,17 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
- public synchronized Map<String, List<String>>
scheduleTasksForDatabase(@Nullable String database) {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(database),
false);
+ public synchronized Map<String, List<String>>
scheduleTasksForDatabase(@Nullable String database,
+ @Nullable String minionInstanceTag) {
+ return scheduleTasks(_pinotHelixResourceManager.getAllTables(database),
false, minionInstanceTag);
}
/**
* Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
*/
- private synchronized Map<String, List<String>> scheduleTasks(List<String>
tableNamesWithType, boolean isLeader) {
+ private synchronized Map<String, List<String>> scheduleTasks(List<String>
tableNamesWithType,
+ boolean isLeader, @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
// Scan all table configs to get the tables with tasks enabled
@@ -525,7 +527,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
if (taskGenerator != null) {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
- tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, isLeader));
+ tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, isLeader, minionInstanceTag));
} else {
List<String> enabledTables = new
ArrayList<>(enabledTableConfigs.size());
for (TableConfig enabledTableConfig : enabledTableConfigs) {
@@ -545,14 +547,15 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
*/
@Nullable
private List<String> scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
- boolean isLeader) {
+ boolean isLeader, @Nullable String minionInstanceTagForTask) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}",
taskGenerator.getTaskType(), isLeader);
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new
HashMap<>();
String taskType = taskGenerator.getTaskType();
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
- String minionInstanceTag =
taskGenerator.getMinionInstanceTag(tableConfig);
+ String minionInstanceTag = minionInstanceTagForTask != null
+ ? minionInstanceTagForTask :
taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag,
k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -624,7 +627,16 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* controller. Returns a map from the task type to the list of tasks
scheduled.
*/
public synchronized Map<String, List<String>> scheduleTasks(String
tableNameWithType) {
- return scheduleTasks(Collections.singletonList(tableNameWithType), false);
+ return scheduleTasks(Collections.singletonList(tableNameWithType), false,
null);
+ }
+
+ /**
+ * Public API to schedule tasks (all task types) for the given table on a
specific instance tag.
+ * It might be called from the non-leader controller. Returns a map from the
task type to the list of tasks scheduled.
+ */
+ public synchronized Map<String, List<String>> scheduleTasks(String
tableNameWithType,
+ @Nullable String minionInstanceTag) {
+ return scheduleTasks(Collections.singletonList(tableNameWithType), false,
minionInstanceTag);
}
/**
@@ -633,8 +645,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
- public synchronized List<String> scheduleTask(String taskType) {
- return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables());
+ public synchronized List<String> scheduleTask(String taskType, @Nullable
String minionInstanceTag) {
+ return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(),
minionInstanceTag);
}
/**
@@ -643,12 +655,13 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* Returns the list of task name, or {@code null} if no task is scheduled.
*/
@Nullable
- public synchronized List<String> scheduleTaskForDatabase(String taskType,
@Nullable String database) {
- return scheduleTask(taskType,
_pinotHelixResourceManager.getAllTables(database));
+ public synchronized List<String> scheduleTaskForDatabase(String taskType,
@Nullable String database,
+ @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType,
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}
@Nullable
- private List<String> scheduleTask(String taskType, List<String> tables) {
+ private List<String> scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
@@ -664,7 +677,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, enabledTableConfigs, false);
+ return scheduleTask(taskGenerator, enabledTableConfigs, false,
minionInstanceTag);
}
/**
@@ -672,7 +685,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* controller. Returns the list of task names, or {@code null} if no task is
scheduled.
*/
@Nullable
- public synchronized List<String> scheduleTask(String taskType, String
tableNameWithType) {
+ public synchronized List<String> scheduleTask(String taskType, String
tableNameWithType,
+ @Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
@@ -685,12 +699,12 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, Collections.singletonList(tableConfig),
false);
+ return scheduleTask(taskGenerator, Collections.singletonList(tableConfig),
false, minionInstanceTag);
}
@Override
protected void processTables(List<String> tableNamesWithType, Properties
taskProperties) {
- scheduleTasks(tableNamesWithType, true);
+ scheduleTasks(tableNamesWithType, true, null);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 1db953f00f..241c1c0876 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
verifyTaskCount(task1.get(0), 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these
sub-tasks will wait
// since we have one minion instance that is still running one of the
sub-tasks.
- List<String> task2 = _taskManager.scheduleTask(TASK_TYPE);
+ List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
assertNotNull(task2);
assertEquals(task2.size(), 1);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -160,7 +160,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
// Our test task generator does not generate if there are already this
many sub-tasks in the
// running+waiting count already.
assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
- assertNull(_taskManager.scheduleTask(TASK_TYPE));
+ assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]