AmatyaAvadhanula commented on code in PR #16834:
URL: https://github.com/apache/druid/pull/16834#discussion_r1706782120
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2482,43 +2482,72 @@ private void verifyAndMergeCheckpoints(final TaskGroup
taskGroup)
);
}
- private void addDiscoveredTaskToPendingCompletionTaskGroups(
+ @VisibleForTesting
+ protected void addDiscoveredTaskToPendingCompletionTaskGroups(
int groupId,
String taskId,
Map<PartitionIdType, SequenceOffsetType> startingPartitions
)
{
- final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.compute(
groupId,
- k -> new CopyOnWriteArrayList<>()
+ (k, val) -> {
+ // Creating new pending completion task groups while compute so that
read and writes are locked.
+ // To ensure synchronisatoin across threads, we need to do updates
in compute so that we get only one task group for all replica tasks
+ if (val == null) {
+ val = new CopyOnWriteArrayList<>();
+ }
+
+ boolean isTaskGroupPresent = false;
+ for (TaskGroup taskGroup : val) {
+ if (taskGroup.startingSequences.equals(startingPartitions)) {
+ isTaskGroupPresent = true;
+ break;
+ }
+ }
+ if (!isTaskGroupPresent) {
+ log.info("Creating new pending completion task group [%s] for
discovered task [%s].", groupId, taskId);
+
+ // reading the minimumMessageTime & maximumMessageTime from the
publishing task and setting it here is not necessary as this task cannot
+ // change to a state where it will read any more events.
+ // This is a discovered task, so it would not have been assigned
closed partitions initially.
+ TaskGroup newTaskGroup = new TaskGroup(
+ groupId,
+ ImmutableMap.copyOf(startingPartitions),
+ null,
+ Optional.absent(),
+ Optional.absent(),
+ null
+ );
+
+ newTaskGroup.tasks.put(taskId, new TaskData());
+ newTaskGroup.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
+
+ val.add(newTaskGroup);
+ }
+ return val;
+ }
);
+
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.startingSequences.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
- log.info("Added discovered task [%s] to existing pending task group
[%s]", taskId, groupId);
+ log.info(
+ "Added discovered task [%s] to existing pending completion task
group [%s]. PendingCompletionTaskGroup: %s",
+ taskId,
+ groupId,
+ taskGroup.taskIds()
+ );
}
return;
}
}
+ }
- log.info("Creating new pending completion task group [%s] for discovered
task [%s]", groupId, taskId);
-
- // reading the minimumMessageTime & maximumMessageTime from the publishing
task and setting it here is not necessary as this task cannot
- // change to a state where it will read any more events.
- // This is a discovered task, so it would not have been assigned closed
partitions initially.
- TaskGroup newTaskGroup = new TaskGroup(
- groupId,
- ImmutableMap.copyOf(startingPartitions),
- null,
- Optional.absent(),
- Optional.absent(),
- null
- );
-
- newTaskGroup.tasks.put(taskId, new TaskData());
- newTaskGroup.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-
- taskGroupList.add(newTaskGroup);
+ @VisibleForTesting
+ protected CopyOnWriteArrayList<TaskGroup> getPendingCompletionTaskGroups(int
group_id)
Review Comment:
I wonder if there's a way we can do this without adding this test-only
method.
Also please use `groupId` instead of `group_id`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2482,43 +2482,72 @@ private void verifyAndMergeCheckpoints(final TaskGroup
taskGroup)
);
}
- private void addDiscoveredTaskToPendingCompletionTaskGroups(
+ @VisibleForTesting
+ protected void addDiscoveredTaskToPendingCompletionTaskGroups(
int groupId,
String taskId,
Map<PartitionIdType, SequenceOffsetType> startingPartitions
)
{
- final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.compute(
groupId,
- k -> new CopyOnWriteArrayList<>()
+ (k, val) -> {
+ // Creating new pending completion task groups while compute so that
read and writes are locked.
+ // To ensure synchronisatoin across threads, we need to do updates
in compute so that we get only one task group for all replica tasks
Review Comment:
Thanks for the comment
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2482,43 +2482,72 @@ private void verifyAndMergeCheckpoints(final TaskGroup
taskGroup)
);
}
- private void addDiscoveredTaskToPendingCompletionTaskGroups(
+ @VisibleForTesting
+ protected void addDiscoveredTaskToPendingCompletionTaskGroups(
int groupId,
String taskId,
Map<PartitionIdType, SequenceOffsetType> startingPartitions
)
{
- final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.compute(
groupId,
- k -> new CopyOnWriteArrayList<>()
+ (k, val) -> {
+ // Creating new pending completion task groups while compute so that
read and writes are locked.
+ // To ensure synchronisatoin across threads, we need to do updates
in compute so that we get only one task group for all replica tasks
+ if (val == null) {
+ val = new CopyOnWriteArrayList<>();
+ }
+
+ boolean isTaskGroupPresent = false;
+ for (TaskGroup taskGroup : val) {
+ if (taskGroup.startingSequences.equals(startingPartitions)) {
+ isTaskGroupPresent = true;
+ break;
+ }
+ }
+ if (!isTaskGroupPresent) {
+ log.info("Creating new pending completion task group [%s] for
discovered task [%s].", groupId, taskId);
+
+ // reading the minimumMessageTime & maximumMessageTime from the
publishing task and setting it here is not necessary as this task cannot
+ // change to a state where it will read any more events.
+ // This is a discovered task, so it would not have been assigned
closed partitions initially.
+ TaskGroup newTaskGroup = new TaskGroup(
+ groupId,
+ ImmutableMap.copyOf(startingPartitions),
+ null,
+ Optional.absent(),
+ Optional.absent(),
+ null
+ );
+
+ newTaskGroup.tasks.put(taskId, new TaskData());
+ newTaskGroup.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
+
+ val.add(newTaskGroup);
+ }
+ return val;
+ }
);
+
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.startingSequences.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
- log.info("Added discovered task [%s] to existing pending task group
[%s]", taskId, groupId);
+ log.info(
+ "Added discovered task [%s] to existing pending completion task
group [%s]. PendingCompletionTaskGroup: %s",
+ taskId,
Review Comment:
Nit: The arguments after the log: taskId, groupId etc can be in a single
line. I think this is permitted only for logging.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]