krishan1390 commented on code in PR #16631:
URL: https://github.com/apache/pinot/pull/16631#discussion_r2300650151


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -765,8 +775,12 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
           
tableTaskConfig.getConfigsForTaskType(taskType).put(MinionConstants.TRIGGERED_BY,
 triggeredBy);
         }
 
-        taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+        generateWithLock(tableName, taskType, () -> {
+          taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+          return null;
+        });
         int maxNumberOfSubTasks = taskGenerator.getMaxAllowedSubTasksPerTask();
+        List<PinotTaskConfig> finalTaskConfigs = presentTaskConfig;

Review Comment:
   why do we need to reassign to finalTaskConfigs ? creating more variables can 
lead to readability issues, because presentTaskConfig.clear() is called in the 
middle which doesn't seem like its affecting finalTaskConfigs but it actually 
is (and should too)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -722,6 +722,28 @@ public void executeAdhocTask(AdhocTaskConfig 
adhocTaskConfig, @Suspended AsyncRe
     }
   }
 
+  @DELETE
+  @Path("/tasks/lock/forceRelease")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.FORCE_RELEASE_TASK_GENERATION_LOCK,
+      paramName = "tableNameWithType")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation("Force releases the task generation lock for a given table and 
task type")
+  public SuccessResponse cleanUpTaskGenerationLock(
+      @ApiParam(value = "Task type.") @QueryParam("taskType") @Nullable
+      String taskType,
+      @ApiParam(value = "Table name (with type suffix).")
+      @QueryParam("tableNameWithType") @Nullable String tableNameWithType) {
+    boolean lockReleased = 
_pinotTaskManager.forceReleaseLock(tableNameWithType, taskType);
+    if (lockReleased) {
+      return new SuccessResponse("Successfully released task generation lock 
on table " + tableNameWithType
+          + " for task type: " + taskType);
+    } else {
+      return new SuccessResponse("Unable to release lock on table " + 
tableNameWithType + " for task type: " + taskType

Review Comment:
   else block should ideally thrown an exception



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
     return response.setScheduledTaskNames(submittedTaskNames);
   }
 
+  /**
+   * Runs the task generation flow only when generation lock is successfully 
acquired
+   *
+   * @param tableNameWithType table for which task is being generated
+   * @param taskType task type for which task is being generated
+   * @param generate generation logic
+   * @throws Exception thrown by task generation logic or if failed to acquire 
lock
+   */
+  public void generateWithLock(String tableNameWithType, String taskType, 
Callable<Void> generate)
+      throws Exception {
+    String lockNodePath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
 taskType);
+    Stat stat = new Stat();
+    HelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    ZNRecord znRecord = propertyStore.get(lockNodePath, stat, 
AccessOption.PERSISTENT);
+    // If lock node already exists check if it has gone beyond TTL
+    if (znRecord != null) {
+      long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+      if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+        throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType
+            + " for task type " + taskType);
+      } else {
+        propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);

Review Comment:
   lets log when we're removing lock forcefully



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
     return response.setScheduledTaskNames(submittedTaskNames);
   }
 
+  /**
+   * Runs the task generation flow only when generation lock is successfully 
acquired
+   *
+   * @param tableNameWithType table for which task is being generated
+   * @param taskType task type for which task is being generated
+   * @param generate generation logic
+   * @throws Exception thrown by task generation logic or if failed to acquire 
lock
+   */
+  public void generateWithLock(String tableNameWithType, String taskType, 
Callable<Void> generate)
+      throws Exception {
+    String lockNodePath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
 taskType);
+    Stat stat = new Stat();
+    HelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    ZNRecord znRecord = propertyStore.get(lockNodePath, stat, 
AccessOption.PERSISTENT);
+    // If lock node already exists check if it has gone beyond TTL
+    if (znRecord != null) {
+      long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+      if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+        throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType

Review Comment:
   the user shouldn't know about locks or other impl details. 
   
   I think we should return that task generation for table and task is in 
progress and concurrent generation isn't allowed. Please retry after some time. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
     return response.setScheduledTaskNames(submittedTaskNames);
   }
 
+  /**
+   * Runs the task generation flow only when generation lock is successfully 
acquired
+   *
+   * @param tableNameWithType table for which task is being generated
+   * @param taskType task type for which task is being generated
+   * @param generate generation logic
+   * @throws Exception thrown by task generation logic or if failed to acquire 
lock
+   */
+  public void generateWithLock(String tableNameWithType, String taskType, 
Callable<Void> generate)
+      throws Exception {
+    String lockNodePath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
 taskType);
+    Stat stat = new Stat();
+    HelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    ZNRecord znRecord = propertyStore.get(lockNodePath, stat, 
AccessOption.PERSISTENT);
+    // If lock node already exists check if it has gone beyond TTL
+    if (znRecord != null) {
+      long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+      if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+        throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType
+            + " for task type " + taskType);
+      } else {
+        propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+      }
+    }
+
+    // Create new lock node
+    znRecord = new ZNRecord(String.valueOf(UUID.randomUUID()));
+    znRecord.setLongField(ACQUIRED_AT, System.currentTimeMillis());
+    if (propertyStore.create(lockNodePath, znRecord, AccessOption.PERSISTENT)) 
{
+      LOGGER.info("Acquired task generation lock on table {} for task type {} 
with id {}", tableNameWithType, taskType,
+          znRecord.getId());
+      try {
+        generate.call();
+      } finally {
+        // release lock by deleting the lock node
+        propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+      }
+    } else {
+      throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType
+          + " for task type " + taskType);
+    }
+  }
+
+  /**
+   * Utility to release the lock forcefully.
+   * This does not ensure the cleanup of ongoing task generation and should be 
used with caution.
+   * @param tableNameWithType table to release the lock from
+   * @param taskType task type to release the lock from
+   * @return true if existing lock was released, false if lock was missing.
+   */
+  public boolean forceReleaseLock(String tableNameWithType, String taskType) {
+    String lockNodePath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
 taskType);
+    // release lock by deleting the lock node
+    return _pinotHelixResourceManager.getPropertyStore().remove(lockNodePath, 
AccessOption.PERSISTENT);

Review Comment:
   lets log this 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
     return response.setScheduledTaskNames(submittedTaskNames);
   }
 
+  /**
+   * Runs the task generation flow only when generation lock is successfully 
acquired
+   *
+   * @param tableNameWithType table for which task is being generated
+   * @param taskType task type for which task is being generated
+   * @param generate generation logic
+   * @throws Exception thrown by task generation logic or if failed to acquire 
lock
+   */
+  public void generateWithLock(String tableNameWithType, String taskType, 
Callable<Void> generate)
+      throws Exception {
+    String lockNodePath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(tableNameWithType,
 taskType);
+    Stat stat = new Stat();
+    HelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+    ZNRecord znRecord = propertyStore.get(lockNodePath, stat, 
AccessOption.PERSISTENT);
+    // If lock node already exists check if it has gone beyond TTL
+    if (znRecord != null) {
+      long acquiredAt = znRecord.getLongField(ACQUIRED_AT, 0);
+      if (acquiredAt + TASK_GENERATION_LOCK_TTL > System.currentTimeMillis()) {
+        throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType
+            + " for task type " + taskType);
+      } else {
+        propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+      }
+    }
+
+    // Create new lock node
+    znRecord = new ZNRecord(String.valueOf(UUID.randomUUID()));
+    znRecord.setLongField(ACQUIRED_AT, System.currentTimeMillis());
+    if (propertyStore.create(lockNodePath, znRecord, AccessOption.PERSISTENT)) 
{
+      LOGGER.info("Acquired task generation lock on table {} for task type {} 
with id {}", tableNameWithType, taskType,
+          znRecord.getId());
+      try {
+        generate.call();
+      } finally {
+        // release lock by deleting the lock node
+        propertyStore.remove(lockNodePath, AccessOption.PERSISTENT);
+      }
+    } else {
+      throw new RuntimeException("Unable to acquire task generation lock on 
table " + tableNameWithType

Review Comment:
   same as earlier
   
   I think we should return that task generation for table and task is in 
progress and concurrent generation isn't allowed. Please retry after some time. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -865,6 +879,64 @@ protected TaskSchedulingInfo 
scheduleTask(PinotTaskGenerator taskGenerator, List
     return response.setScheduledTaskNames(submittedTaskNames);
   }
 
+  /**
+   * Runs the task generation flow only when generation lock is successfully 
acquired
+   *
+   * @param tableNameWithType table for which task is being generated
+   * @param taskType task type for which task is being generated
+   * @param generate generation logic
+   * @throws Exception thrown by task generation logic or if failed to acquire 
lock
+   */
+  public void generateWithLock(String tableNameWithType, String taskType, 
Callable<Void> generate)

Review Comment:
   this should be private or package private if we want to add tests for this. 
the tests can be moved to PinotTaskManagerTest class so that all tests of this 
class are discovered there and we don't need the method to be public too then. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to