krishan1390 commented on code in PR #16631:
URL: https://github.com/apache/pinot/pull/16631#discussion_r2300650151
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -765,8 +775,12 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
tableTaskConfig.getConfigsForTaskType(taskType).put(MinionConstants.TRIGGERED_BY,
triggeredBy);
}
- taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ generateWithLock(tableName, taskType, () -> {
+ taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ return null;
+ });
int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
+ List<PinotTaskConfig> finalTaskConfigs = presentTaskConfig;
Review Comment:
why do we need to reassign to finalTaskConfigs ? creating more variables can
lead to readability issues, because presentTaskConfig.clear() is called in the
middle which doesn't seem like its affecting finalTaskConfigs but it actually
is (and should too)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -722,6 +722,28 @@ public void executeAdhocTask(AdhocTaskConfig
adhocTaskConfig, @Suspended AsyncRe
}
}
+ @DELETE
+ @Path("/tasks/lock/forceRelease")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.FORCE_RELEASE_TASK_GENERATION_LOCK,
+ paramName = "tableNameWithType")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation("Force releases the task generation lock for a given table and
task type")
+ public SuccessResponse cleanUpTaskGenerationLock(
+ @ApiParam(value = "Task type.") @QueryParam("taskType") @Nullable
+ String taskType,
+ @ApiParam(value = "Table name (with type suffix).")
+ @QueryParam("tableNameWithType") @Nullable String tableNameWithType) {
+ boolean lockReleased =
_pinotTaskManager.forceReleaseLock(tableNameWithType, taskType);
+ if (lockReleased) {
+ return new SuccessResponse("Successfully released task generation lock
on table " + tableNameWithType
+ + " for task type: " + taskType);
+ } else {
+ return new SuccessResponse("Unable to release lock on table " +
tableNameWithType + " for task type: " + taskType
Review Comment:
else block should ideally thrown an exception
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
return response.setScheduledTaskNames(submittedTaskNames);
}
+ /**
+ * Runs the task generation flow only when generation lock is successfully
acquired
+ *
+ * @param tableNameWithType table for which task is being generated
+ * @param taskType task type for which task is being generated
+ * @param generate generation logic
+ * @throws Exception thrown by task generation logic or if failed to acquire
lock
+ */
+ public void generateWithLock(String tableNameWithType, String taskType,
Callable<Void> generate)
+ throws Exception {
+ String lockNodePath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
taskType);
+ Stat stat = new Stat();
+ HelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ ZNRecord znRecord = propertyStore.get(lockNodePath, stat,
AccessOption.PERSISTENT);
+ // If lock node already exists check if it has gone beyond TTL
+ if (znRecord != null) {
+ long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+ if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
+ + " for task type " + taskType);
+ } else {
+ propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
Review Comment:
lets log when we're removing lock forcefully
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
return response.setScheduledTaskNames(submittedTaskNames);
}
+ /**
+ * Runs the task generation flow only when generation lock is successfully
acquired
+ *
+ * @param tableNameWithType table for which task is being generated
+ * @param taskType task type for which task is being generated
+ * @param generate generation logic
+ * @throws Exception thrown by task generation logic or if failed to acquire
lock
+ */
+ public void generateWithLock(String tableNameWithType, String taskType,
Callable<Void> generate)
+ throws Exception {
+ String lockNodePath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
taskType);
+ Stat stat = new Stat();
+ HelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ ZNRecord znRecord = propertyStore.get(lockNodePath, stat,
AccessOption.PERSISTENT);
+ // If lock node already exists check if it has gone beyond TTL
+ if (znRecord != null) {
+ long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+ if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
Review Comment:
the user shouldn't know about locks or other impl details.
I think we should return that task generation for table and task is in
progress and concurrent generation isn't allowed. Please retry after some time.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
return response.setScheduledTaskNames(submittedTaskNames);
}
+ /**
+ * Runs the task generation flow only when generation lock is successfully
acquired
+ *
+ * @param tableNameWithType table for which task is being generated
+ * @param taskType task type for which task is being generated
+ * @param generate generation logic
+ * @throws Exception thrown by task generation logic or if failed to acquire
lock
+ */
+ public void generateWithLock(String tableNameWithType, String taskType,
Callable<Void> generate)
+ throws Exception {
+ String lockNodePath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
taskType);
+ Stat stat = new Stat();
+ HelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ ZNRecord znRecord = propertyStore.get(lockNodePath, stat,
AccessOption.PERSISTENT);
+ // If lock node already exists check if it has gone beyond TTL
+ if (znRecord != null) {
+ long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+ if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
+ + " for task type " + taskType);
+ } else {
+ propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+ }
+ }
+
+ // Create new lock node
+ znRecord = new ZNRecord(String.valueOf(UUID.randomUUID()));
+ znRecord.setLongField(ACQUIRED_AT, System.currentTimeMillis());
+ if (propertyStore.create(lockNodePath, znRecord, AccessOption.PERSISTENT))
{
+ LOGGER.info("Acquired task generation lock on table {} for task type {}
with id {}", tableNameWithType, taskType,
+ znRecord.getId());
+ try {
+ generate.call();
+ } finally {
+ // release lock by deleting the lock node
+ propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+ }
+ } else {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
+ + " for task type " + taskType);
+ }
+ }
+
+ /**
+ * Utility to release the lock forcefully.
+ * This does not ensure the cleanup of ongoing task generation and should be
used with caution.
+ * @param tableNameWithType table to release the lock from
+ * @param taskType task type to release the lock from
+ * @return true if existing lock was released, false if lock was missing.
+ */
+ public boolean forceReleaseLock(String tableNameWithType, String taskType) {
+ String lockNodePath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
taskType);
+ // release lock by deleting the lock node
+ return _pinotHelixResourceManager.getPropertyStore().remove(lockNodePath,
AccessOption.PERSISTENT);
Review Comment:
lets log this
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
return response.setScheduledTaskNames(submittedTaskNames);
}
+ /**
+ * Runs the task generation flow only when generation lock is successfully
acquired
+ *
+ * @param tableNameWithType table for which task is being generated
+ * @param taskType task type for which task is being generated
+ * @param generate generation logic
+ * @throws Exception thrown by task generation logic or if failed to acquire
lock
+ */
+ public void generateWithLock(String tableNameWithType, String taskType,
Callable<Void> generate)
+ throws Exception {
+ String lockNodePath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
taskType);
+ Stat stat = new Stat();
+ HelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
+ ZNRecord znRecord = propertyStore.get(lockNodePath, stat,
AccessOption.PERSISTENT);
+ // If lock node already exists check if it has gone beyond TTL
+ if (znRecord != null) {
+ long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+ if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
+ + " for task type " + taskType);
+ } else {
+ propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+ }
+ }
+
+ // Create new lock node
+ znRecord = new ZNRecord(String.valueOf(UUID.randomUUID()));
+ znRecord.setLongField(ACQUIRED_AT, System.currentTimeMillis());
+ if (propertyStore.create(lockNodePath, znRecord, AccessOption.PERSISTENT))
{
+ LOGGER.info("Acquired task generation lock on table {} for task type {}
with id {}", tableNameWithType, taskType,
+ znRecord.getId());
+ try {
+ generate.call();
+ } finally {
+ // release lock by deleting the lock node
+ propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+ }
+ } else {
+ throw new RuntimeException("Unable to acquire task generation lock on
table " + tableNameWithType
Review Comment:
same as earlier
I think we should return that task generation for table and task is in
progress and concurrent generation isn't allowed. Please retry after some time.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
return response.setScheduledTaskNames(submittedTaskNames);
}
+ /**
+ * Runs the task generation flow only when generation lock is successfully
acquired
+ *
+ * @param tableNameWithType table for which task is being generated
+ * @param taskType task type for which task is being generated
+ * @param generate generation logic
+ * @throws Exception thrown by task generation logic or if failed to acquire
lock
+ */
+ public void generateWithLock(String tableNameWithType, String taskType,
Callable<Void> generate)
Review Comment:
this should be private or package private if we want to add tests for this.
the tests can be moved to PinotTaskManagerTest class so that all tests of this
class are discovered there and we don't need the method to be public too then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]