kfaraz commented on code in PR #13328:
URL: https://github.com/apache/druid/pull/13328#discussion_r1017854427
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2276,17 +2277,35 @@ public Void apply(@Nullable Boolean result)
);
}
+ /**
+ * Determines whether a given task was created by the current version of the
supervisor.
+ * Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
+ * If not found in the map, fetch it from the metadata store.
+ * @param taskGroupId task group id
+ * @param taskId task id
+ * @param activeTaskMap Set of active tasks that were pre-fetched
+ * @return true if the task was created by the current supervisor
+ */
@VisibleForTesting
- public boolean isTaskCurrent(int taskGroupId, String taskId)
+ public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String,
Task> activeTaskMap)
{
- Optional<Task> taskOptional = taskStorage.getTask(taskId);
- if (!taskOptional.isPresent() ||
!doesTaskTypeMatchSupervisor(taskOptional.get())) {
- return false;
+ Task genericTask;
+ if (activeTaskMap == null || !activeTaskMap.containsKey(taskId)) {
+ Optional<Task> taskOptional = taskStorage.getTask(taskId);
+ if (!taskOptional.isPresent() ||
!doesTaskTypeMatchSupervisor(taskOptional.get())) {
+ return false;
+ }
+ genericTask = (SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType, RecordType>) taskOptional.get();
+ } else {
+ if (!doesTaskTypeMatchSupervisor(activeTaskMap.get(taskId))) {
+ return false;
+ }
+ genericTask = (SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType, RecordType>) activeTaskMap.get(taskId);
Review Comment:
Suggestion: this whole block can be simplified, I think.
Since this should be a private method which always gets a non-null
`activeTaskMap`, you can even avoid the null check.
```suggestion
final Task genericTask;
if (activeTaskMap != null && activeTaskMap.containsKey(taskId)) {
genericTask = activeTaskMap.get(taskId);
} else {
genericTask = taskStorage.getTask(taskId).orElse(null);
}
if (genericTask == null || !doesTaskTypeMatchSupervisor(genericTask)) {
return false;
}
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2276,17 +2277,35 @@ public Void apply(@Nullable Boolean result)
);
}
+ /**
+ * Determines whether a given task was created by the current version of the
supervisor.
+ * Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
+ * If not found in the map, fetch it from the metadata store.
+ * @param taskGroupId task group id
+ * @param taskId task id
+ * @param activeTaskMap Set of active tasks that were pre-fetched
+ * @return true if the task was created by the current supervisor
+ */
@VisibleForTesting
- public boolean isTaskCurrent(int taskGroupId, String taskId)
+ public boolean isTaskCurrent(int taskGroupId, String taskId, Map<String,
Task> activeTaskMap)
{
- Optional<Task> taskOptional = taskStorage.getTask(taskId);
- if (!taskOptional.isPresent() ||
!doesTaskTypeMatchSupervisor(taskOptional.get())) {
- return false;
+ Task genericTask;
+ if (activeTaskMap == null || !activeTaskMap.containsKey(taskId)) {
+ Optional<Task> taskOptional = taskStorage.getTask(taskId);
+ if (!taskOptional.isPresent() ||
!doesTaskTypeMatchSupervisor(taskOptional.get())) {
+ return false;
+ }
+ genericTask = (SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType, RecordType>) taskOptional.get();
Review Comment:
`genericTask` is of type `Task`. You need not cast here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]