jtuglu1 commented on code in PR #19091:
URL: https://github.com/apache/druid/pull/19091#discussion_r2893939422


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2626,8 +2634,8 @@ private void verifyAndMergeCheckpoints(
           taskGroup.checkpointSequences.clear();
           taskGroup.checkpointSequences.putAll(latestCheckpoints);
         } else {
-          log.debug(
-              "Adding task[%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
+          log.warn(
+              "Adding task[%s] to kill list as its checkpoints[%s] are not 
consistent with latestoffsets from DB[%s]",

Review Comment:
   nit: can we keep this `..checkpoints[%s]` so its more predictable to grep in 
logs?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2669,18 +2677,50 @@ private void verifyAndMergeCheckpoints(
         sequenceCheckpoint -> {
           killTask(
               sequenceCheckpoint.lhs,
-              "Killing task[%s], as its checkpoints[%s] are not consistent 
with group checkpoints[%s]"
-              + " or latest persisted sequences in metadata store[%s].",
-              sequenceCheckpoint.lhs,
-              sequenceCheckpoint.rhs,
-              taskGroup.checkpointSequences,
-              latestOffsetsFromDb
+              "Killing task as its checkpoints are not consistent with group 
checkpoints"
+              + " or latest persisted sequences in metadata store."
           );
           taskGroup.removeTask(sequenceCheckpoint.lhs);
         }
     );
   }
 
+  /**
+   * Checks if there is another {@link TaskGroup} publishing to any of the 
partitions
+   * that are being read by the given {@param taskGroup}. If this method 
returns
+   * true, it indicates that the current taskGroup would need to wait for the
+   * older taskGroups to finish publishing before it can publish its own 
offsets.
+   */
+  private boolean isAnotherTaskGroupPublishingToPartitionsOf(TaskGroup 
taskGroup)
+  {
+    final Set<PartitionIdType> partitionsPendingPublishFromOtherGroups =
+        pendingCompletionTaskGroups
+            .values()
+            .stream()
+            .flatMap(Collection::stream)
+            .filter(group -> !group.equals(taskGroup))
+            .flatMap(group -> group.startingSequences.keySet().stream())
+            .collect(Collectors.toSet());
+
+    if (partitionsPendingPublishFromOtherGroups.isEmpty()) {
+      return false;
+    }
+
+    for (PartitionIdType partitionId : taskGroup.startingSequences.keySet()) {
+      if (partitionsPendingPublishFromOtherGroups.contains(partitionId)) {
+        log.info(
+            "taskGroup[%s] with sequenceName[%s] needs to wait before 
publishing"
+            + " as another taskGroup is currently publishing to 
partition[%s].",

Review Comment:
   Can we log the other taskGroup



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2641,8 +2649,8 @@ private void verifyAndMergeCheckpoints(
                                 
.equals(taskGroup.checkpointSequences.firstEntry().getValue()))
             || 
taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size()
                != taskGroup.checkpointSequences.size()) {
-          log.debug(
-              "Adding task[%s] to kill list, checkpoints[%s], taskgroup 
checkpoints [%s]",
+          log.warn(
+              "Adding task[%s] to kill list as its checkpoints[%s] are not 
consistent with taskgroup checkpoints [%s]",

Review Comment:
   nit: checkpoints[%s].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to