yashmayya commented on code in PR #16307:
URL: https://github.com/apache/pinot/pull/16307#discussion_r2206956361
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -463,6 +473,68 @@ public SuccessResponse deleteTable(
"Table '" + tableName + "' with type " + tableType + " does not
exist", Response.Status.NOT_FOUND);
}
+ public static void tableTasksValidation(TableConfig tableConfig,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ if (tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ String tableWithType = tableConfig.getTableName();
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ return;
+ }
+ if (!taskStates.isEmpty()) {
+ throw new RuntimeException("The table has dangling task data, try
performing table delete operation in case "
+ + "the delete operation was not completed successfully, else
delete the tasks manually through "
+ + "DELETE /tasks/task/{taskName} endpoint."
+ + "Please try again once the dangling tasks are cleaned up");
Review Comment:
+1 on this comment, `RuntimeException` will probably result in a 500
response which is likely not what we want here.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -463,6 +473,68 @@ public SuccessResponse deleteTable(
"Table '" + tableName + "' with type " + tableType + " does not
exist", Response.Status.NOT_FOUND);
}
+ public static void tableTasksValidation(TableConfig tableConfig,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ if (tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ String tableWithType = tableConfig.getTableName();
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ return;
+ }
+ if (!taskStates.isEmpty()) {
+ throw new RuntimeException("The table has dangling task data, try
performing table delete operation in case "
+ + "the delete operation was not completed successfully, else
delete the tasks manually through "
+ + "DELETE /tasks/task/{taskName} endpoint."
+ + "Please try again once the dangling tasks are cleaned up");
+ }
+ }
+ }
+
+ public static void tableTasksCleanup(String tableWithType, boolean
ignoreActiveTasks,
+ PinotHelixResourceManager pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager)
+ throws IOException {
+ TableConfig tableConfig =
pinotHelixResourceManager.getTableConfig(tableWithType);
+ if (tableConfig == null || tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ Set<String> taskTypes = taskTypeConfigsMap.keySet();
+ for (String taskType : taskTypes) {
+ // remove the task schedules to avoid task being scheduled during table
deletion
+ taskTypeConfigsMap.get(taskType).remove(PinotTaskManager.SCHEDULE_KEY);
+ }
+ pinotHelixResourceManager.updateTableConfig(tableConfig);
+ List<String> pendingTasks = new ArrayList<>();
+ for (String taskType : taskTypes) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ continue;
+ }
+ for (String taskName : taskStates.keySet()) {
+ if (TaskState.IN_PROGRESS.equals(taskStates.get(taskName))) {
+ pendingTasks.add(taskName);
+ } else {
+ pinotHelixTaskResourceManager.deleteTask(taskName, true);
+ }
+ }
+ }
+ if (!ignoreActiveTasks && !pendingTasks.isEmpty()) {
+ throw new RuntimeException("The table has " + pendingTasks.size() + "
active running tasks : " + pendingTasks
+ + ". The task schedules have been cleared, so no new task should be
generated by pinot. "
Review Comment:
```suggestion
+ ". The task schedules have been cleared, so no new task should
be generated. "
```
nit: `by pinot` isn't necessary and sounds a little strange.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -463,6 +473,68 @@ public SuccessResponse deleteTable(
"Table '" + tableName + "' with type " + tableType + " does not
exist", Response.Status.NOT_FOUND);
}
+ public static void tableTasksValidation(TableConfig tableConfig,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ if (tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ String tableWithType = tableConfig.getTableName();
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ return;
+ }
+ if (!taskStates.isEmpty()) {
+ throw new RuntimeException("The table has dangling task data, try
performing table delete operation in case "
+ + "the delete operation was not completed successfully, else
delete the tasks manually through "
+ + "DELETE /tasks/task/{taskName} endpoint."
+ + "Please try again once the dangling tasks are cleaned up");
+ }
+ }
+ }
+
+ public static void tableTasksCleanup(String tableWithType, boolean
ignoreActiveTasks,
+ PinotHelixResourceManager pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager)
+ throws IOException {
+ TableConfig tableConfig =
pinotHelixResourceManager.getTableConfig(tableWithType);
+ if (tableConfig == null || tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ Set<String> taskTypes = taskTypeConfigsMap.keySet();
+ for (String taskType : taskTypes) {
+ // remove the task schedules to avoid task being scheduled during table
deletion
+ taskTypeConfigsMap.get(taskType).remove(PinotTaskManager.SCHEDULE_KEY);
+ }
+ pinotHelixResourceManager.updateTableConfig(tableConfig);
+ List<String> pendingTasks = new ArrayList<>();
+ for (String taskType : taskTypes) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ continue;
+ }
+ for (String taskName : taskStates.keySet()) {
+ if (TaskState.IN_PROGRESS.equals(taskStates.get(taskName))) {
+ pendingTasks.add(taskName);
+ } else {
+ pinotHelixTaskResourceManager.deleteTask(taskName, true);
+ }
+ }
+ }
+ if (!ignoreActiveTasks && !pendingTasks.isEmpty()) {
+ throw new RuntimeException("The table has " + pendingTasks.size() + "
active running tasks : " + pendingTasks
Review Comment:
Same comment as above on the exception type.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -463,6 +473,68 @@ public SuccessResponse deleteTable(
"Table '" + tableName + "' with type " + tableType + " does not
exist", Response.Status.NOT_FOUND);
}
+ public static void tableTasksValidation(TableConfig tableConfig,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ if (tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ String tableWithType = tableConfig.getTableName();
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ return;
+ }
+ if (!taskStates.isEmpty()) {
+ throw new RuntimeException("The table has dangling task data, try
performing table delete operation in case "
+ + "the delete operation was not completed successfully, else
delete the tasks manually through "
+ + "DELETE /tasks/task/{taskName} endpoint."
+ + "Please try again once the dangling tasks are cleaned up");
Review Comment:
nit: no space between sentences.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java:
##########
@@ -1090,6 +1052,192 @@ private static InstanceAssignmentConfig
getInstanceAssignmentConfig(String tag,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(),
false);
}
+ @Test
+ public void testTableTasksValidationWithNoDanglingTasks()
+ throws Exception {
+ String tableName = "testTableTasksValidation";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
ImmutableMap.of())))
+ .build();
+
+ // Should succeed when no dangling tasks exist
+ String creationResponse = sendPostRequest(_createTableUrl,
offlineTableConfig.toJsonString());
+ assertEquals(creationResponse,
+ "{\"unrecognizedProperties\":{},\"status\":\"Table
testTableTasksValidation_OFFLINE successfully added\"}");
+
+ // Clean up
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ }
+
+ @Test
+ public void testTableTasksValidationWithDanglingTasks()
+ throws Exception {
+ String tableName = "testTableTasksValidationWithDangling";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // First create the table successfully
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create a task manually to simulate dangling task
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+ // Now try to create another table with same name (simulating re-creation
with dangling tasks)
+ sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+ .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+
+ try {
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+ fail("Table creation should fail when dangling tasks exist");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("The table has dangling task data"));
+ }
+
+ // Clean up any remaining tasks
+ try {
+ sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+ .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+ } catch (Exception ignored) {
+ // Ignore if table doesn't exist
+ }
+ }
+
+ @Test
+ public void testTableTasksValidationWithNullTaskConfig()
+ throws Exception {
+ String tableName = "testTableTasksValidationNullConfig";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig =
getOfflineTableBuilder(tableName).build(); // No task config
+
+ // Should succeed when task config is null
+ String creationResponse = sendPostRequest(_createTableUrl,
offlineTableConfig.toJsonString());
+ assertEquals(creationResponse, "{\"unrecognizedProperties\":{},"
+ + "\"status\":\"Table testTableTasksValidationNullConfig_OFFLINE
successfully added\"}");
+
+ // Clean up
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ }
+
+ @Test
+ public void testTableTasksCleanupWithNonActiveTasks()
+ throws Exception {
+ String tableName = "testTableTasksCleanup";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // Create table
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create some completed tasks
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+ // stop the task queue to abort the task
+ sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+
.forStopMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+ waitForTaskState(taskName, TaskState.STOPPED);
+ // resume the task queue again to avoid affecting other tests
+ sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+
.forResumeMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+
+ // Delete table - should succeed and clean up tasks
+ String deleteResponse = sendDeleteRequest(
+
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+ }
+
+ private static void waitForTaskState(String taskName, TaskState
expectedState) {
+ TestUtils.waitForCondition((aVoid) -> {
+ String response;
+ try {
+ response =
sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forMinionTaskState(taskName));
+ } catch (IOException e) {
+ return false;
+ }
+ return response.replace("\"", "").equals(expectedState.name());
+ }, 5000, "Task not scheduled");
+ }
+
+ @Test
+ public void testTableTasksCleanupWithActiveTasks()
+ throws Exception {
+ String tableName = "testTableTasksCleanupActive";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // Create table
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create an active/in-progress task
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+ try {
+ // Try to delete table without ignoring active tasks - should fail
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ fail("Table deletion should fail when active tasks exist");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("The table has") &&
e.getMessage().contains("active running tasks"));
+ }
+
+ // Delete table with ignoreActiveTasks flag - should succeed
+ String deleteResponse = sendDeleteRequest(
+
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName +
"?ignoreActiveTasks=true"));
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+ // delete task queue
Review Comment:
Is this step missing?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]