abhishekagarwal87 commented on code in PR #15039:
URL: https://github.com/apache/druid/pull/15039#discussion_r1356595435


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -957,6 +959,62 @@ private Set<ReplaceTaskLock> 
getNonRevokedReplaceLocks(List<TaskLockPosse> posse
     return replaceLocks;
   }
 
+  /**
+   * @param conflictingLockRequests Requests for conflicing lock intervals for 
various datasources
+   * @return Map from datasource to intervals locked by tasks that have a 
conflicting lock type that cannot be revoked
+   */
+  public Map<String, List<Interval>> 
getConflictingLockIntervals(List<ConflictingLockRequest> 
conflictingLockRequests)
+  {
+    final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+    // Take a lock and populate the maps
+    giant.lock();
+
+    try {
+      conflictingLockRequests.forEach(
+          conflictingLockRequest -> {
+            final String datasource = conflictingLockRequest.getDatasource();
+            final int priority = conflictingLockRequest.getPriority();
+            final boolean ignoreAppendLocks =
+                
TaskLockType.REPLACE.name().equals(conflictingLockRequest.getContext().get(Tasks.TASK_LOCK_TYPE));
+            if (!running.containsKey(datasource)) {
+              return;
+            }

Review Comment:
   this check could be moved up. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -274,6 +276,20 @@ public Response getDatasourceLockedIntervals(Map<String, 
Integer> minTaskPriorit
     return 
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
   }
 
+  @POST
+  @Path("/lockedIntervals/v2")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> 
lockFilterPolicies)

Review Comment:
   ```suggestion
     public Response getDatasourceLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -247,6 +265,38 @@ public boolean checkPointDataSourceMetadata(
     return false;
   }
 
+  /**
+   * Registers a new version of the given pending segment on a supervisor. This
+   * allows the supervisor to include the pending segment in queries fired 
against
+   * that segment version.
+   */
+  public boolean registerNewVersionOfPendingSegmentOnSupervisor(
+      String supervisorId,
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newSegmentVersion
+  )
+  {
+    try {
+      Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
+      Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment 
cannot be null");
+      Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot 
be null");
+
+      Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+      Preconditions.checkNotNull(supervisor, "supervisor could not be found");
+      if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+        return false;
+      }
+
+      SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = 
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
+      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+      return true;
+    }
+    catch (Exception e) {
+      log.error(e, "Pending segment mapping update request failed");

Review Comment:
   please add details of the argument. like supervisor id, pending segment 
info, new version info



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java:
##########
@@ -60,6 +61,15 @@ public List<Task> getActiveTasks()
     return storage.getActiveTasks();
   }
 
+  /**
+   * @param lockFilterPolicies Requests for conflicing lock intervals for 
various datasources
+   * @return Map from datasource to intervals locked by tasks that have a 
conflicting lock type that cannot be revoked
+   */
+  public Map<String, List<Interval>> 
getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)

Review Comment:
   ```suggestion
     public Map<String, List<Interval>> 
getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -274,6 +276,20 @@ public Response getDatasourceLockedIntervals(Map<String, 
Integer> minTaskPriorit
     return 
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
   }
 
+  @POST
+  @Path("/lockedIntervals/v2")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> 
lockFilterPolicies)
+  {
+    if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+      return Response.status(Status.BAD_REQUEST).entity("No Datasource 
provided").build();

Review Comment:
   ```suggestion
         return Response.status(Status.BAD_REQUEST).entity("No filter 
provided").build();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -956,6 +959,62 @@ private Set<ReplaceTaskLock> 
getNonRevokedReplaceLocks(List<TaskLockPosse> posse
     return replaceLocks;
   }
 
+  /**
+   * @param lockFilterPolicies Lock filters for the given datasources
+   * @return Map from datasource to intervals locked by tasks satisfying the 
lock filter condititions
+   */
+  public Map<String, List<Interval>> 
getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)

Review Comment:
   ```suggestion
     public Map<String, List<Interval>> 
getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
   ```
   



##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -185,15 +186,15 @@ ListenableFuture<CloseableIterator<TaskStatusPlus>> 
taskStatuses(
   ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses();
 
   /**
-   * Returns a list of intervals locked by higher priority tasks for each 
datasource.
+   * Returns a list of intervals locked by higher priority conflicting lock 
types
    *
-   * @param minTaskPriority Minimum task priority for each datasource. Only 
the intervals that are locked by tasks with
-   *                        equal or higher priority than this are returned.
-   *
-   * @return Map from dtasource name to list of intervals locked by tasks that 
have priority greater than or equal to
-   * the {@code minTaskPriority} for that datasource.
+   * @param lockFilterPolicies List of all filters for different datasources
+   * @return Map from datasource name to list of intervals locked by tasks 
that have a conflicting lock type with
+   * priority greater than or equal to the {@code minTaskPriority} for that 
datasource.
    */
-  ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(Map<String, Integer> minTaskPriority);
+  ListenableFuture<Map<String, List<Interval>>> findLockedIntervalsV2(

Review Comment:
   ```suggestion
     ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to