This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f616be74201 Perform task data cleanup as part of table deletion
(#16307)
f616be74201 is described below
commit f616be74201ea0e68bf01443e2db4aab19a79027
Author: Shounak kulkarni <[email protected]>
AuthorDate: Wed Jul 16 17:53:17 2025 +0530
Perform task data cleanup as part of table deletion (#16307)
---
.../api/resources/PinotTableRestletResource.java | 83 ++++-
.../api/resources/TableConfigsRestletResource.java | 26 +-
.../core/minion/PinotHelixTaskResourceManager.java | 7 +-
.../api/PinotTableRestletResourceTest.java | 349 +++++++++++++++------
.../utils/builder/ControllerRequestURLBuilder.java | 16 +
5 files changed, 373 insertions(+), 108 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bd97c37d1e5..7f1dae48634 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -72,6 +72,7 @@ import
org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.RebalanceInProgressException;
@@ -207,8 +208,10 @@ public class PinotTableRestletResource {
@ManualAuthorization // performed after parsing table configs
public ConfigSuccessResponse addTable(String tableConfigStr,
@ApiParam(value = "comma separated list of validation type(s) to skip.
supported types: (ALL|TASK|UPSERT)")
- @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
@Context HttpHeaders httpHeaders,
- @Context Request request) {
+ @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+ @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks")
boolean ignoreActiveTasks,
+ @Context HttpHeaders httpHeaders, @Context Request request)
+ throws IOException {
// TODO introduce a table config ctor with json string.
Pair<TableConfig, Map<String, Object>>
tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
@@ -246,6 +249,9 @@ public class PinotTableRestletResource {
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
+ if (!ignoreActiveTasks) {
+ tableTasksValidation(tableConfig, _pinotHelixTaskResourceManager);
+ }
_pinotHelixResourceManager.addTable(tableConfig);
// TODO: validate that table was created successfully
// (in realtime case, metadata might not have been created but would be
created successfully in the next run of
@@ -260,6 +266,8 @@ public class PinotTableRestletResource {
throw new ControllerApplicationException(LOGGER, errStr,
Response.Status.BAD_REQUEST, e);
} else if (e instanceof TableAlreadyExistsException) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.CONFLICT, e);
+ } else if (e instanceof ControllerApplicationException) {
+ throw e;
} else {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
}
@@ -426,6 +434,7 @@ public class PinotTableRestletResource {
@ApiParam(value = "Retention period for the table segments (e.g. 12h,
3d); If not set, the retention period "
+ "will default to the first config that's not null: the cluster
setting, then '7d'. Using 0d or -1d will "
+ "instantly delete segments without retention")
@QueryParam("retention") String retentionPeriod,
+ @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks")
boolean ignoreActiveTasks,
@Context HttpHeaders headers) {
TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -435,21 +444,25 @@ public class PinotTableRestletResource {
validateLogicalTableReference(tableName, tableType);
boolean tableExist = false;
if (verifyTableType(tableName, tableType, TableType.OFFLINE)) {
+ String tableWithType =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ tableTasksCleanup(tableWithType, ignoreActiveTasks,
_pinotHelixResourceManager, _pinotHelixTaskResourceManager);
tableExist = _pinotHelixResourceManager.hasOfflineTable(tableName);
// Even the table name does not exist, still go on to delete remaining
table metadata in case a previous delete
// did not complete.
_pinotHelixResourceManager.deleteOfflineTable(tableName,
retentionPeriod);
if (tableExist) {
-
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+ tablesDeleted.add(tableWithType);
}
}
if (verifyTableType(tableName, tableType, TableType.REALTIME)) {
+ String tableWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ tableTasksCleanup(tableWithType, ignoreActiveTasks,
_pinotHelixResourceManager, _pinotHelixTaskResourceManager);
tableExist = _pinotHelixResourceManager.hasRealtimeTable(tableName);
// Even the table name does not exist, still go on to delete remaining
table metadata in case a previous delete
// did not complete.
_pinotHelixResourceManager.deleteRealtimeTable(tableName,
retentionPeriod);
if (tableExist) {
-
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ tablesDeleted.add(tableWithType);
}
}
if (!tablesDeleted.isEmpty()) {
@@ -463,6 +476,68 @@ public class PinotTableRestletResource {
"Table '" + tableName + "' with type " + tableType + " does not
exist", Response.Status.NOT_FOUND);
}
+ public static void tableTasksValidation(TableConfig tableConfig,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ if (tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ String tableWithType = tableConfig.getTableName();
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ return;
+ }
+ if (!taskStates.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "The table has
dangling task data, try performing table "
+ + "delete operation in case the delete operation was not completed
successfully, else delete the tasks "
+ + "manually through DELETE /tasks/task/{taskName} endpoint. Please
try again once the dangling tasks are "
+ + "cleaned up", Response.Status.BAD_REQUEST);
+ }
+ }
+ }
+
+ public static void tableTasksCleanup(String tableWithType, boolean
ignoreActiveTasks,
+ PinotHelixResourceManager pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager)
+ throws IOException {
+ TableConfig tableConfig =
pinotHelixResourceManager.getTableConfig(tableWithType);
+ if (tableConfig == null || tableConfig.getTaskConfig() == null) {
+ return;
+ }
+ Map<String, Map<String, String>> taskTypeConfigsMap =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ Set<String> taskTypes = taskTypeConfigsMap.keySet();
+ for (String taskType : taskTypes) {
+ // remove the task schedules to avoid task being scheduled during table
deletion
+ taskTypeConfigsMap.get(taskType).remove(PinotTaskManager.SCHEDULE_KEY);
+ }
+ pinotHelixResourceManager.updateTableConfig(tableConfig);
+ List<String> pendingTasks = new ArrayList<>();
+ for (String taskType : taskTypes) {
+ Map<String, TaskState> taskStates;
+ try {
+ taskStates =
pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableWithType);
+ } catch (IllegalArgumentException e) {
+ LOGGER.info(e.getMessage());
+ continue;
+ }
+ for (String taskName : taskStates.keySet()) {
+ if (TaskState.IN_PROGRESS.equals(taskStates.get(taskName))) {
+ pendingTasks.add(taskName);
+ } else {
+ pinotHelixTaskResourceManager.deleteTask(taskName, true);
+ }
+ }
+ }
+ if (!ignoreActiveTasks && !pendingTasks.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "The table has " +
pendingTasks.size() + " active running tasks "
+ + ": " + pendingTasks + ". The task schedules have been cleared, so
new tasks should not be generated. "
+ + "Please try again once there are no more active tasks",
Response.Status.BAD_REQUEST);
+ }
+ }
+
// Return true iff the table is of the expectedType based on the given
tableName and tableType. The truth table:
// tableType TableNameBuilder.getTableTypeFromTableName(tableName)
Return value
// 1. null null (i.e., table has no type suffix) true
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index 3a15ecb30e8..1a26ddc8265 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -64,6 +64,7 @@ import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
import org.apache.pinot.controller.util.TaskConfigUtils;
@@ -108,6 +109,9 @@ public class TableConfigsRestletResource {
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
+ @Inject
+ PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
@Inject
PinotTaskManager _pinotTaskManager;
@@ -192,8 +196,10 @@ public class TableConfigsRestletResource {
public ConfigSuccessResponse addConfig(
String tableConfigsStr,
@ApiParam(value = "comma separated list of validation type(s) to skip.
supported types: (ALL|TASK|UPSERT)")
- @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
@Context HttpHeaders httpHeaders,
- @Context Request request) {
+ @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+ @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks")
boolean ignoreActiveTasks,
+ @Context HttpHeaders httpHeaders, @Context Request request)
+ throws Exception {
Pair<TableConfigs, Map<String, Object>> tableConfigsAndUnrecognizedProps;
try {
tableConfigsAndUnrecognizedProps =
@@ -231,9 +237,15 @@ public class TableConfigsRestletResource {
if (offlineTableConfig != null) {
tuneConfig(offlineTableConfig, schema);
+ if (!ignoreActiveTasks) {
+ PinotTableRestletResource.tableTasksValidation(offlineTableConfig,
_pinotHelixTaskResourceManager);
+ }
}
if (realtimeTableConfig != null) {
tuneConfig(realtimeTableConfig, schema);
+ if (!ignoreActiveTasks) {
+ PinotTableRestletResource.tableTasksValidation(realtimeTableConfig,
_pinotHelixTaskResourceManager);
+ }
}
try {
_pinotHelixResourceManager.addSchema(schema, false, false);
@@ -264,6 +276,8 @@ public class TableConfigsRestletResource {
rawTableName, e.getMessage()), Response.Status.BAD_REQUEST, e);
} else if (e instanceof TableAlreadyExistsException) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.CONFLICT, e);
+ } else if (e instanceof ControllerApplicationException) {
+ throw e;
} else {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
}
@@ -283,7 +297,9 @@ public class TableConfigsRestletResource {
@ApiOperation(value = "Delete the TableConfigs", notes = "Delete the
TableConfigs")
public SuccessResponse deleteConfig(
@ApiParam(value = "TableConfigs name i.e. raw table name", required =
true) @PathParam("tableName")
- String tableName, @Context HttpHeaders headers) {
+ String tableName,
+ @ApiParam(defaultValue = "false") @QueryParam("ignoreActiveTasks")
boolean ignoreActiveTasks,
+ @Context HttpHeaders headers) {
try {
if (TableNameBuilder.isOfflineTableResource(tableName) ||
TableNameBuilder.isRealtimeTableResource(tableName)) {
throw new ControllerApplicationException(LOGGER, "Invalid table name:
" + tableName + ". Use raw table name.",
@@ -306,9 +322,13 @@ public class TableConfigsRestletResource {
boolean tableExists =
_pinotHelixResourceManager.hasRealtimeTable(tableName) ||
_pinotHelixResourceManager.hasOfflineTable(
tableName);
+
PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.REALTIME.tableNameWithType(tableName),
+ ignoreActiveTasks, _pinotHelixResourceManager,
_pinotHelixTaskResourceManager);
// Delete whether tables exist or not
_pinotHelixResourceManager.deleteRealtimeTable(tableName);
LOGGER.info("Deleted realtime table: {}", tableName);
+
PinotTableRestletResource.tableTasksCleanup(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
+ ignoreActiveTasks, _pinotHelixResourceManager,
_pinotHelixTaskResourceManager);
_pinotHelixResourceManager.deleteOfflineTable(tableName);
LOGGER.info("Deleted offline table: {}", tableName);
boolean schemaExists =
_pinotHelixResourceManager.deleteSchema(tableName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e1620f7c935..6f1fb4c0a22 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -524,8 +524,11 @@ public class PinotHelixTaskResourceManager {
* @return List of child task configs
*/
public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName)
{
- Collection<TaskConfig> helixTaskConfigs =
-
_taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values();
+ JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+ if (jobConfig == null) {
+ return List.of();
+ }
+ Collection<TaskConfig> helixTaskConfigs =
jobConfig.getTaskConfigMap().values();
List<PinotTaskConfig> taskConfigs = new
ArrayList<>(helixTaskConfigs.size());
for (TaskConfig helixTaskConfig : helixTaskConfigs) {
taskConfigs.add(PinotTaskConfig.fromHelixTaskConfig(helixTaskConfig));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index e464032ee8f..7af7ad37345 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -29,10 +29,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.task.TaskState;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.core.common.MinionConstants;
@@ -50,18 +55,21 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -76,8 +84,8 @@ import static org.testng.Assert.fail;
public class PinotTableRestletResourceTest extends ControllerTest {
private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
- private final TableConfigBuilder _offlineBuilder = new
TableConfigBuilder(TableType.OFFLINE);
- private final TableConfigBuilder _realtimeBuilder = new
TableConfigBuilder(TableType.REALTIME);
+ private final TableConfigBuilder _offlineBuilder =
getOfflineTableBuilder(OFFLINE_TABLE_NAME);
+ private final TableConfigBuilder _realtimeBuilder =
getRealtimeTableBuilder(REALTIME_TABLE_NAME);
private String _createTableUrl;
@BeforeClass
@@ -86,13 +94,25 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
DEFAULT_INSTANCE.setupSharedStateAndValidate();
registerMinionTasks();
_createTableUrl =
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableCreate();
-
_offlineBuilder.setTableName(OFFLINE_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
+ }
+
+ private TableConfigBuilder getRealtimeTableBuilder(String tableName) {
+ return new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(tableName)
+ .setTimeColumnName("timeColumn")
+ .setTimeType("DAYS")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("5")
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+ }
- StreamConfig streamConfig =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
-
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5")
- .setStreamConfigs(streamConfig.getStreamConfigsMap());
+ private TableConfigBuilder getOfflineTableBuilder(String tableName) {
+ return new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(tableName)
+ .setTimeColumnName("timeColumn")
+ .setTimeType("DAYS")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("5");
}
@BeforeMethod
@@ -104,39 +124,29 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
private void registerMinionTasks() {
PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
- taskManager.registerTaskGenerator(new BaseTaskGenerator() {
- @Override
- public String getTaskType() {
- return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
- }
-
- @Override
- public List<PinotTaskConfig> generateTasks(List<TableConfig>
tableConfigs) {
- return List.of(new
PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new
HashMap<>()));
- }
- });
- taskManager.registerTaskGenerator(new BaseTaskGenerator() {
- @Override
- public String getTaskType() {
- return MinionConstants.MergeRollupTask.TASK_TYPE;
- }
+ ClusterInfoAccessor clusterInfoAccessor =
Mockito.mock(ClusterInfoAccessor.class);
+ Mockito.when(clusterInfoAccessor.getClusterConfig(any())).thenReturn(null);
+
registerTaskGenerator(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
taskManager, clusterInfoAccessor);
+ registerTaskGenerator(MinionConstants.MergeRollupTask.TASK_TYPE,
taskManager, clusterInfoAccessor);
+
registerTaskGenerator(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
taskManager, clusterInfoAccessor);
+ }
- @Override
- public List<PinotTaskConfig> generateTasks(List<TableConfig>
tableConfigs) {
- return List.of(new
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, new HashMap<>()));
- }
- });
- taskManager.registerTaskGenerator(new BaseTaskGenerator() {
+ private static void registerTaskGenerator(String taskType, PinotTaskManager
taskManager,
+ ClusterInfoAccessor clusterInfoAccessor) {
+ BaseTaskGenerator taskGenerator = new BaseTaskGenerator() {
@Override
public String getTaskType() {
- return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
+ return taskType;
}
@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig>
tableConfigs) {
- return List.of(new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>()));
+ return List.of(new PinotTaskConfig(taskType,
+
tableConfigs.get(0).getTaskConfig().getConfigsForTaskType(getTaskType())));
}
- });
+ };
+ taskGenerator.init(clusterInfoAccessor);
+ taskManager.registerTaskGenerator(taskGenerator);
}
@Test
@@ -881,13 +891,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 2));
- TableConfig realtimeTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ TableConfig realtimeTableConfig = getRealtimeTableBuilder(tableName)
.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap)
.setNumReplicas(10)
.build();
@@ -903,13 +907,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
instanceAssignmentConfigMap.put(InstancePartitionsType.CONSUMING.name(),
getInstanceAssignmentConfig("DefaultTenant_REALTIME", 4, 1));
- realtimeTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ realtimeTableConfig = getRealtimeTableBuilder(tableName)
.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap)
.setNumReplicas(10)
.build();
@@ -992,23 +990,11 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
*/
private void validateTableUpdateReplicationToInvalidValue(String
rawTableName, TableType tableType) {
String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
- TableConfig tableConfig =
- tableType == TableType.REALTIME ? new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .setNumReplicas(5)
- .build() : new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
- .setNumReplicas(5)
- .build();
+ TableConfig tableConfig = (tableType == TableType.REALTIME
+ ? getRealtimeTableBuilder(rawTableName)
+ : getOfflineTableBuilder(rawTableName))
+ .setNumReplicas(5)
+ .build();
try {
sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1018,23 +1004,11 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
}
private void createTableWithValidReplication(String rawTableName, TableType
tableType) {
- TableConfig tableConfig =
- tableType == TableType.REALTIME ? new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .setNumReplicas(1)
- .build() : new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
- .setNumReplicas(1)
- .build();
+ TableConfig tableConfig = (tableType == TableType.REALTIME
+ ? getRealtimeTableBuilder(rawTableName)
+ : getOfflineTableBuilder(rawTableName))
+ .setNumReplicas(1)
+ .build();
try {
sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1051,23 +1025,11 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
throws IOException {
String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
DEFAULT_INSTANCE.addDummySchema(rawTableName);
- TableConfig tableConfig =
- tableType == TableType.REALTIME ? new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .setNumReplicas(5)
- .build() : new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
- .setServerTenant("DefaultTenant")
- .setTimeColumnName("timeColumn")
- .setTimeType("DAYS")
- .setRetentionTimeUnit("DAYS")
- .setRetentionTimeValue("5")
- .setNumReplicas(5)
- .build();
+ TableConfig tableConfig = (tableType == TableType.REALTIME
+ ? getRealtimeTableBuilder(rawTableName)
+ : getOfflineTableBuilder(rawTableName))
+ .setNumReplicas(5)
+ .build();
try {
sendPostRequest(_createTableUrl, tableConfig.toJsonString());
@@ -1090,6 +1052,195 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.name(),
false);
}
+ @Test
+ public void testTableTasksValidationWithNoDanglingTasks()
+ throws Exception {
+ String tableName = "testTableTasksValidation";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
ImmutableMap.of())))
+ .build();
+
+ // Should succeed when no dangling tasks exist
+ String creationResponse = sendPostRequest(_createTableUrl,
offlineTableConfig.toJsonString());
+ assertEquals(creationResponse,
+ "{\"unrecognizedProperties\":{},\"status\":\"Table
testTableTasksValidation_OFFLINE successfully added\"}");
+
+ // Clean up
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ }
+
+ @Test
+ public void testTableTasksValidationWithDanglingTasks()
+ throws Exception {
+ String tableName = "testTableTasksValidationWithDangling";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // First create the table successfully
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create a task manually to simulate dangling task
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+ // Now try to create another table with same name (simulating re-creation
with dangling tasks)
+ sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+ .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+
+ try {
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+ fail("Table creation should fail when dangling tasks exist");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("The table has dangling task data"));
+ }
+
+ // Clean up any remaining tasks
+ try {
+ sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+ .forTableDelete(tableName + "?ignoreActiveTasks=true"));
+ } catch (Exception ignored) {
+ // Ignore if table doesn't exist
+ }
+ }
+
+ @Test
+ public void testTableTasksValidationWithNullTaskConfig()
+ throws Exception {
+ String tableName = "testTableTasksValidationNullConfig";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig =
getOfflineTableBuilder(tableName).build(); // No task config
+
+ // Should succeed when task config is null
+ String creationResponse = sendPostRequest(_createTableUrl,
offlineTableConfig.toJsonString());
+ assertEquals(creationResponse, "{\"unrecognizedProperties\":{},"
+ + "\"status\":\"Table testTableTasksValidationNullConfig_OFFLINE
successfully added\"}");
+
+ // Clean up
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ }
+
+ @Test
+ public void testTableTasksCleanupWithNonActiveTasks()
+ throws Exception {
+ String tableName = "testTableTasksCleanup";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // Create table
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create some completed tasks
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+
+ // stop the task queue to abort the task
+ sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+
.forStopMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+ waitForTaskState(taskName, TaskState.STOPPED);
+ // resume the task queue again to avoid affecting other tests
+ sendPutRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder()
+
.forResumeMinionTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+
+ // Delete table - should succeed and clean up tasks
+ String deleteResponse = sendDeleteRequest(
+
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+ }
+
+ private static void waitForTaskState(String taskName, TaskState
expectedState) {
+ TestUtils.waitForCondition((aVoid) -> {
+ String response;
+ try {
+ response =
sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forMinionTaskState(taskName));
+ } catch (IOException e) {
+ return false;
+ }
+ return response.replace("\"", "").equals(expectedState.name());
+ }, 5000, "Task not scheduled");
+ }
+
+ @Test
+ public void testTableTasksCleanupWithActiveTasks()
+ throws Exception {
+ String tableName = "testTableTasksCleanupActive";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of(
+ MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ ImmutableMap.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
+ CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ .build();
+
+ // Create table
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Create an active/in-progress task
+ PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
+ TaskSchedulingContext context = new TaskSchedulingContext();
+ context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
+ String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
+ waitForTaskState(taskName, TaskState.IN_PROGRESS);
+ try {
+ // Try to delete table without ignoring active tasks - should fail
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ fail("Table deletion should fail when active tasks exist");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("The table has") &&
e.getMessage().contains("active running tasks"));
+ }
+
+ // Delete table with ignoreActiveTasks flag - should succeed
+ String deleteResponse = sendDeleteRequest(
+
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName +
"?ignoreActiveTasks=true"));
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+
+ // delete task
+
sendDeleteRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forDeleteMinionTask(taskName)
+ + "?forceDelete=true");
+ }
+
+ @Test
+ public void testTableTasksCleanupWithNullTaskConfig()
+ throws Exception {
+ String tableName = "testTableTasksCleanupNullConfig";
+ DEFAULT_INSTANCE.addDummySchema(tableName);
+
+ TableConfig offlineTableConfig =
getOfflineTableBuilder(tableName).build(); // No task config
+
+ // Create table
+ sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString());
+
+ // Delete table - should succeed even with null task config
+ String deleteResponse = sendDeleteRequest(
+
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+ }
+
@AfterMethod
public void cleanUp()
throws IOException {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 0fad49e6407..cb5de2956ac 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -118,6 +118,22 @@ public class ControllerRequestURLBuilder {
+ "&type=" + tableType);
}
+ public String forMinionTaskState(String taskName) {
+ return StringUtil.join("/", _baseUrl, "tasks", "task", taskName, "state");
+ }
+
+ public String forDeleteMinionTask(String taskName) {
+ return StringUtil.join("/", _baseUrl, "tasks", "task", taskName);
+ }
+
+ public String forStopMinionTaskQueue(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
+ }
+
+ public String forResumeMinionTaskQueue(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume");
+ }
+
public String forUpdateUserConfig(String username, String componentTypeStr,
boolean passwordChanged) {
StringBuilder params = new StringBuilder();
if (StringUtils.isNotBlank(username)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]