abhishekagarwal87 commented on code in PR #15039:
URL: https://github.com/apache/druid/pull/15039#discussion_r1356595435
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -957,6 +959,62 @@ private Set<ReplaceTaskLock>
getNonRevokedReplaceLocks(List<TaskLockPosse> posse
return replaceLocks;
}
+ /**
+ * @param conflictingLockRequests Requests for conflicing lock intervals for
various datasources
+ * @return Map from datasource to intervals locked by tasks that have a
conflicting lock type that cannot be revoked
+ */
+ public Map<String, List<Interval>>
getConflictingLockIntervals(List<ConflictingLockRequest>
conflictingLockRequests)
+ {
+ final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+ // Take a lock and populate the maps
+ giant.lock();
+
+ try {
+ conflictingLockRequests.forEach(
+ conflictingLockRequest -> {
+ final String datasource = conflictingLockRequest.getDatasource();
+ final int priority = conflictingLockRequest.getPriority();
+ final boolean ignoreAppendLocks =
+
TaskLockType.REPLACE.name().equals(conflictingLockRequest.getContext().get(Tasks.TASK_LOCK_TYPE));
+ if (!running.containsKey(datasource)) {
+ return;
+ }
Review Comment:
this check could be moved up.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -274,6 +276,20 @@ public Response getDatasourceLockedIntervals(Map<String,
Integer> minTaskPriorit
return
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
}
+ @POST
+ @Path("/lockedIntervals/v2")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy>
lockFilterPolicies)
Review Comment:
```suggestion
public Response getDatasourceLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -247,6 +265,38 @@ public boolean checkPointDataSourceMetadata(
return false;
}
+ /**
+ * Registers a new version of the given pending segment on a supervisor. This
+ * allows the supervisor to include the pending segment in queries fired
against
+ * that segment version.
+ */
+ public boolean registerNewVersionOfPendingSegmentOnSupervisor(
+ String supervisorId,
+ SegmentIdWithShardSpec basePendingSegment,
+ SegmentIdWithShardSpec newSegmentVersion
+ )
+ {
+ try {
+ Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
+ Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment
cannot be null");
+ Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot
be null");
+
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ Preconditions.checkNotNull(supervisor, "supervisor could not be found");
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+ return false;
+ }
+
+ SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor =
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
+
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment,
newSegmentVersion);
+ return true;
+ }
+ catch (Exception e) {
+ log.error(e, "Pending segment mapping update request failed");
Review Comment:
please add details of the argument. like supervisor id, pending segment
info, new version info
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java:
##########
@@ -60,6 +61,15 @@ public List<Task> getActiveTasks()
return storage.getActiveTasks();
}
+ /**
+ * @param lockFilterPolicies Requests for conflicing lock intervals for
various datasources
+ * @return Map from datasource to intervals locked by tasks that have a
conflicting lock type that cannot be revoked
+ */
+ public Map<String, List<Interval>>
getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
Review Comment:
```suggestion
public Map<String, List<Interval>>
getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -274,6 +276,20 @@ public Response getDatasourceLockedIntervals(Map<String,
Integer> minTaskPriorit
return
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
}
+ @POST
+ @Path("/lockedIntervals/v2")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy>
lockFilterPolicies)
+ {
+ if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+ return Response.status(Status.BAD_REQUEST).entity("No Datasource
provided").build();
Review Comment:
```suggestion
return Response.status(Status.BAD_REQUEST).entity("No filter
provided").build();
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -956,6 +959,62 @@ private Set<ReplaceTaskLock>
getNonRevokedReplaceLocks(List<TaskLockPosse> posse
return replaceLocks;
}
+ /**
+ * @param lockFilterPolicies Lock filters for the given datasources
+ * @return Map from datasource to intervals locked by tasks satisfying the
lock filter condititions
+ */
+ public Map<String, List<Interval>>
getLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
Review Comment:
```suggestion
public Map<String, List<Interval>>
getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
```
##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -185,15 +186,15 @@ ListenableFuture<CloseableIterator<TaskStatusPlus>>
taskStatuses(
ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses();
/**
- * Returns a list of intervals locked by higher priority tasks for each
datasource.
+ * Returns a list of intervals locked by higher priority conflicting lock
types
*
- * @param minTaskPriority Minimum task priority for each datasource. Only
the intervals that are locked by tasks with
- * equal or higher priority than this are returned.
- *
- * @return Map from dtasource name to list of intervals locked by tasks that
have priority greater than or equal to
- * the {@code minTaskPriority} for that datasource.
+ * @param lockFilterPolicies List of all filters for different datasources
+ * @return Map from datasource name to list of intervals locked by tasks
that have a conflicting lock type with
+ * priority greater than or equal to the {@code minTaskPriority} for that
datasource.
*/
- ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(Map<String, Integer> minTaskPriority);
+ ListenableFuture<Map<String, List<Interval>>> findLockedIntervalsV2(
Review Comment:
```suggestion
ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]