hardikbajaj commented on code in PR #16834:
URL: https://github.com/apache/druid/pull/16834#discussion_r1706808723


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2482,43 +2482,72 @@ private void verifyAndMergeCheckpoints(final TaskGroup 
taskGroup)
     );
   }
 
-  private void addDiscoveredTaskToPendingCompletionTaskGroups(
+  @VisibleForTesting
+  protected void addDiscoveredTaskToPendingCompletionTaskGroups(
       int groupId,
       String taskId,
       Map<PartitionIdType, SequenceOffsetType> startingPartitions
   )
   {
-    final CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.computeIfAbsent(
+    final CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.compute(
         groupId,
-        k -> new CopyOnWriteArrayList<>()
+        (k, val) -> {
+          // Creating new pending completion task groups while compute so that 
read and writes are locked.
+          // To ensure synchronisatoin across threads, we need to do updates 
in compute so that we get only one task group for all replica tasks
+          if (val == null) {
+            val = new CopyOnWriteArrayList<>();
+          }
+
+          boolean isTaskGroupPresent = false;
+          for (TaskGroup taskGroup : val) {
+            if (taskGroup.startingSequences.equals(startingPartitions)) {
+              isTaskGroupPresent = true;
+              break;
+            }
+          }
+          if (!isTaskGroupPresent) {
+            log.info("Creating new pending completion task group [%s] for 
discovered task [%s].", groupId, taskId);
+
+            // reading the minimumMessageTime & maximumMessageTime from the 
publishing task and setting it here is not necessary as this task cannot
+            // change to a state where it will read any more events.
+            // This is a discovered task, so it would not have been assigned 
closed partitions initially.
+            TaskGroup newTaskGroup = new TaskGroup(
+                groupId,
+                ImmutableMap.copyOf(startingPartitions),
+                null,
+                Optional.absent(),
+                Optional.absent(),
+                null
+            );
+
+            newTaskGroup.tasks.put(taskId, new TaskData());
+            newTaskGroup.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
+
+            val.add(newTaskGroup);
+          }
+          return val;
+        }
     );
+
     for (TaskGroup taskGroup : taskGroupList) {
       if (taskGroup.startingSequences.equals(startingPartitions)) {
         if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
-          log.info("Added discovered task [%s] to existing pending task group 
[%s]", taskId, groupId);
+          log.info(
+              "Added discovered task [%s] to existing pending completion task 
group [%s]. PendingCompletionTaskGroup: %s",
+              taskId,
+              groupId,
+              taskGroup.taskIds()
+          );
         }
         return;
       }
     }
+  }
 
-    log.info("Creating new pending completion task group [%s] for discovered 
task [%s]", groupId, taskId);
-
-    // reading the minimumMessageTime & maximumMessageTime from the publishing 
task and setting it here is not necessary as this task cannot
-    // change to a state where it will read any more events.
-    // This is a discovered task, so it would not have been assigned closed 
partitions initially.
-    TaskGroup newTaskGroup = new TaskGroup(
-        groupId,
-        ImmutableMap.copyOf(startingPartitions),
-        null,
-        Optional.absent(),
-        Optional.absent(),
-        null
-    );
-
-    newTaskGroup.tasks.put(taskId, new TaskData());
-    newTaskGroup.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-
-    taskGroupList.add(newTaskGroup);
+  @VisibleForTesting
+  protected CopyOnWriteArrayList<TaskGroup> getPendingCompletionTaskGroups(int 
group_id)

Review Comment:
   Since `pendingCompletionTaskGroups` is a private variable, I created this 
read-only method. I can't read it in the child classes which we use for testing 
too, without changing this to protected.
   > Also please use groupId instead of group_id
   
   I'll change to groupId.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to