kfaraz commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1034851486


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -446,6 +449,140 @@ public LockResult tryLock(final Task task, final 
LockRequest request)
     }
   }
 
+  /**
+   * Attempts to allocate segments for the given requests. Each request 
contains
+   * a {@link Task} and a {@link SegmentAllocateAction}. This method tries to
+   * acquire the task locks on the required intervals/segments and then 
performs
+   * a batch allocation of segments. It is possible that some requests succeed
+   * successfully and others failed. In that case, only the failed ones should 
be
+   * retried.
+   *
+   * @param requests                List of allocation requests
+   * @param dataSource              Datasource for which segment is to be 
allocated.
+   * @param interval                Interval for which segment is to be 
allocated.
+   * @param skipSegmentLineageCheck Whether lineage check is to be skipped
+   *                                (this is true for streaming ingestion)
+   * @param lockGranularity         Granularity of task lock
+   * @return List of allocation results in the same order as the requests.
+   */
+  public List<SegmentAllocateResult> allocateSegments(
+      List<SegmentAllocateRequest> requests,
+      String dataSource,
+      Interval interval,
+      boolean skipSegmentLineageCheck,
+      LockGranularity lockGranularity
+  )
+  {
+    log.info("Allocating [%d] segments for datasource [%s], interval [%s]", 
requests.size(), dataSource, interval);
+    final boolean isTimeChunkLock = lockGranularity == 
LockGranularity.TIME_CHUNK;
+
+    final AllocationHolderList holderList = new AllocationHolderList(requests, 
interval);
+    holderList.getPending().forEach(this::verifyTaskIsActive);
+
+    giant.lock();
+    try {
+      if (isTimeChunkLock) {
+        holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
true));
+        allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
+      } else {
+        allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
+        holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
false));
+      }
+
+      // TODO: for failed allocations, cleanup newly created locks from the 
posse map

Review Comment:
   Yeah, let me check this. If not done, this might leave some garbage locks in 
the posse map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to