FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3201843040


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,175 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if all partitions have empty ranges
+    // For exclusive end offsets (Kafka): start >= end means empty
+    // For inclusive end offsets (Kinesis): only start > end means empty 
(start == end is one record)
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);
+
+      boolean isEmpty;
+      if (isEndOffsetExclusive()) {
+        // Exclusive: empty if start >= end
+        isEmpty = isOffsetAtOrBeyond(start, end);
+      } else {
+        // Inclusive: empty only if start > end
+        isEmpty = isOffsetAtOrBeyond(start, end) && !start.equals(end);
+      }
+
+      if (!isEmpty) {
+        allPartitionsEmptyRange = false;
+        break;
+      }
+    }
+
+    if (allPartitionsEmptyRange) {
+      log.warn(
+          "TaskGroup[%d] has empty range for all partitions (start %s end). "
+          + "No work to do, marking as complete. Start: %s, End: %s",
+          groupId,
+          isEndOffsetExclusive() ? ">=" : ">",
+          startOffsets,
+          endOffsets
+      );
+      return true;
+    }
+
+    Map<PartitionIdType, SequenceOffsetType> currentOffsets = 
getOffsetsFromMetadataStorage();
+
+    log.info(
+        "Bounded mode: checking completion for taskGroup[%d]. Current offsets 
from metadata: %s, End offsets: %s",
+        groupId,
+        currentOffsets,
+        endOffsets
+    );
+
+    if (currentOffsets == null || currentOffsets.isEmpty()) {
+      log.debug("No checkpointed offsets found, taskGroup[%d] has not 
completed", groupId);
+      return false; // No progress yet, task hasn't completed
+    }
+
+    // Check if ALL partitions in this group have reached their end offsets
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType endOffset = endOffsets.get(partition);
+      SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+      if (currentOffset == null) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] has no checkpointed offset, not 
complete",
+            partition,
+            groupId
+        );
+        return false; // Partition hasn't started processing
+      }
+
+      if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] at offset[%s], has not reached 
end[%s]",
+            partition,
+            groupId,
+            currentOffset,
+            endOffset
+        );
+        return false; // This partition hasn't reached its end
+      }
+    }
+
+    log.info(
+        "All partitions in taskGroup[%d] have reached their end offsets",
+        groupId
+    );
+    return true; // All partitions have reached their end offsets

Review Comment:
   [P1] Validate bounded metadata before completing from old offsets
   
   A new bounded supervisor can silently complete using offsets from a previous 
bounded run with a different boundedStreamConfig. The mismatch check only runs 
in getOffsetFromStorageForPartition(), but isBoundedWorkComplete() calls 
hasTaskGroupReachedBoundedEnd() first, which reads raw metadata offsets and 
returns true when currentOffset >= the new end offset. For example, after a run 
[0,100] commits offset 100, posting [50,75] for the same datasource reaches 
COMPLETED without running tasks or throwing the documented metadata mismatch 
error. Validate the stored bounded config before treating metadata offsets as 
completion, or avoid using prior offsets when the bounded config differs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to