clintropolis commented on a change in pull request #8870: Additional Kinesis
resharding fixes
URL: https://github.com/apache/incubator-druid/pull/8870#discussion_r351531805
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -2271,9 +2351,29 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
group.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
- // set endOffsets as the next startOffsets
+
+ boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
- partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
+ if (entry.getValue().equals(getEndOfPartitionMarker())) {
+ log.info(
+ "Got end of partition marker for partition [%s] in
checkTaskDuration, not updating partition offset.",
+ entry.getKey()
+ );
+ endOffsetsAreInvalid = true;
+ }
+ }
+
+ // set endOffsets as the next startOffsets
+ // If we received invalid endOffset values, we clear the known offset
to refetch the last committed offset
+ // from metadata.
+ if (!endOffsetsAreInvalid) {
Review comment:
could you add a comment suggesting that we are considering all invalid if
any invalid as a safety measure?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]