hardikbajaj commented on code in PR #16834:
URL: https://github.com/apache/druid/pull/16834#discussion_r1706808723
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2482,43 +2482,72 @@ private void verifyAndMergeCheckpoints(final TaskGroup
taskGroup)
);
}
- private void addDiscoveredTaskToPendingCompletionTaskGroups(
+ @VisibleForTesting
+ protected void addDiscoveredTaskToPendingCompletionTaskGroups(
int groupId,
String taskId,
Map<PartitionIdType, SequenceOffsetType> startingPartitions
)
{
- final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.compute(
groupId,
- k -> new CopyOnWriteArrayList<>()
+ (k, val) -> {
+ // Creating new pending completion task groups while compute so that
read and writes are locked.
+ // To ensure synchronisatoin across threads, we need to do updates
in compute so that we get only one task group for all replica tasks
+ if (val == null) {
+ val = new CopyOnWriteArrayList<>();
+ }
+
+ boolean isTaskGroupPresent = false;
+ for (TaskGroup taskGroup : val) {
+ if (taskGroup.startingSequences.equals(startingPartitions)) {
+ isTaskGroupPresent = true;
+ break;
+ }
+ }
+ if (!isTaskGroupPresent) {
+ log.info("Creating new pending completion task group [%s] for
discovered task [%s].", groupId, taskId);
+
+ // reading the minimumMessageTime & maximumMessageTime from the
publishing task and setting it here is not necessary as this task cannot
+ // change to a state where it will read any more events.
+ // This is a discovered task, so it would not have been assigned
closed partitions initially.
+ TaskGroup newTaskGroup = new TaskGroup(
+ groupId,
+ ImmutableMap.copyOf(startingPartitions),
+ null,
+ Optional.absent(),
+ Optional.absent(),
+ null
+ );
+
+ newTaskGroup.tasks.put(taskId, new TaskData());
+ newTaskGroup.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
+
+ val.add(newTaskGroup);
+ }
+ return val;
+ }
);
+
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.startingSequences.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
- log.info("Added discovered task [%s] to existing pending task group
[%s]", taskId, groupId);
+ log.info(
+ "Added discovered task [%s] to existing pending completion task
group [%s]. PendingCompletionTaskGroup: %s",
+ taskId,
+ groupId,
+ taskGroup.taskIds()
+ );
}
return;
}
}
+ }
- log.info("Creating new pending completion task group [%s] for discovered
task [%s]", groupId, taskId);
-
- // reading the minimumMessageTime & maximumMessageTime from the publishing
task and setting it here is not necessary as this task cannot
- // change to a state where it will read any more events.
- // This is a discovered task, so it would not have been assigned closed
partitions initially.
- TaskGroup newTaskGroup = new TaskGroup(
- groupId,
- ImmutableMap.copyOf(startingPartitions),
- null,
- Optional.absent(),
- Optional.absent(),
- null
- );
-
- newTaskGroup.tasks.put(taskId, new TaskData());
- newTaskGroup.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-
- taskGroupList.add(newTaskGroup);
+ @VisibleForTesting
+ protected CopyOnWriteArrayList<TaskGroup> getPendingCompletionTaskGroups(int
group_id)
Review Comment:
Since `pendingCompletionTaskGroups` is a private variable, I created this
read-only method. I can't read it in the child classes which we use for testing
too, without changing this to protected.
> Also please use groupId instead of group_id
I'll change to groupId.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]