capistrant commented on code in PR #19091:
URL: https://github.com/apache/druid/pull/19091#discussion_r2890753343
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2669,18 +2677,50 @@ private void verifyAndMergeCheckpoints(
sequenceCheckpoint -> {
killTask(
sequenceCheckpoint.lhs,
- "Killing task[%s], as its checkpoints[%s] are not consistent
with group checkpoints[%s]"
- + " or latest persisted sequences in metadata store[%s].",
- sequenceCheckpoint.lhs,
- sequenceCheckpoint.rhs,
- taskGroup.checkpointSequences,
- latestOffsetsFromDb
+ "Killing task as its checkpoints are not consistent with group
checkpoints"
+ + " or latest persisted sequences in metadata store."
);
taskGroup.removeTask(sequenceCheckpoint.lhs);
}
);
}
+ /**
+ * Checks if there is another {@link TaskGroup} publishing to any of the
partitions
+ * that are being read by the given {@param taskGroup}. If this method
returns
+ * true, it indicates that the current taskGroup would need to wait for the
+ * older taskGroups to finish publishing before it can publish its own
offsets.
+ */
+ private boolean isAnotherTaskGroupPublishingToPartitionsOf(TaskGroup
taskGroup)
+ {
+ final Set<PartitionIdType> partitionsPendingPublishFromOtherGroups =
+ pendingCompletionTaskGroups
+ .values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(group -> !group.equals(taskGroup))
Review Comment:
TaskGroup doesn't have an override for the equals method. can we compare by
group id or add an equals override? Or am I missing the point here where the
reference equality is fine
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java:
##########
@@ -33,8 +33,12 @@
public abstract class TransactionalSegmentPublisher
{
- private static final int QUIET_RETRIES = 3;
- private static final int MAX_RETRIES = 5;
+ private static final int QUIET_RETRIES = 5;
+
+ /**
+ * Approximately 10 minutes of retrying using {@link
RetryUtils#nextRetrySleepMillis(int)}.
+ */
+ private static final int MAX_RETRIES = 13;
Review Comment:
Any concern about something that will never succeed no matter what take 10
min to fail here? Not sure what we could do about it anyways though. If this is
needed to stabilize auto scaling then I guess it is worth it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]