kfaraz commented on code in PR #16535:
URL: https://github.com/apache/druid/pull/16535#discussion_r1623829293
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3161,57 +3156,52 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>>
futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();
- boolean stopTasksEarly;
- if (earlyStopTime != null && (earlyStopTime.isBeforeNow() ||
earlyStopTime.isEqualNow())) {
- log.info("Early stop requested - signalling tasks to complete");
-
+ final boolean stopTasksEarly;
+ if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
+ log.info("Early stop requested, signalling tasks to complete.");
earlyStopTime = null;
stopTasksEarly = true;
} else {
stopTasksEarly = false;
}
- AtomicInteger stoppedTasks = new AtomicInteger();
+ final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
- activelyReadingTaskGroups
- .entrySet().stream().sorted(
+ activelyReadingTaskGroups.entrySet().stream().sorted(
Comparator.comparingLong(
- (Entry<Integer, TaskGroup> entry) ->
- computeEarliestTaskStartTime(entry.getValue())
- .getMillis()))
+ taskGroupEntry ->
computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
+ )
+ )
.forEach(entry -> {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
- if (stopTasksEarly) {
+ final DateTime earliestTaskStart =
computeEarliestTaskStartTime(group);
+ final Duration runDuration =
Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis());
+ if (stopTasksEarly || group.getHandoffEarly()) {
+ // If handoffEarly has been set, stop tasks irrespective of
stopTaskCount
log.info(
- "Stopping task group [%d] early. It has run for [%s]",
- groupId,
- ioConfig.getTaskDuration()
+ "Stopping taskGroup[%d] early after running for duration[%s].",
+ groupId, runDuration
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
- } else {
- DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
-
- if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() ||
group.getHandoffEarly()) {
- // if this task has run longer than the configured duration
- // as long as the pending task groups are less than the
configured stop task count.
- // If shutdownEarly has been set, ignore stopTaskCount since
this is a manual operator action.
- if (pendingCompletionTaskGroups.values()
- .stream()
-
.mapToInt(CopyOnWriteArrayList::size)
- .sum() + stoppedTasks.get()
- < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
- log.info(
- "Task group [%d] has run for [%s]. Stopping.",
- groupId,
- ioConfig.getTaskDuration()
- );
- futureGroupIds.add(groupId);
- futures.add(checkpointTaskGroup(group, true));
- stoppedTasks.getAndIncrement();
- }
+ if (group.getHandoffEarly()) {
+ numStoppedTasks.getAndIncrement();
+ }
+ } else if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+ // Stop this task group if it has run longer than the configured
duration
+ // and the pending task groups are less than the configured stop
task count.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ if (numPendingCompletionTaskGroups + numStoppedTasks.get() <
ioConfig.getMaxAllowedStops()) {
+ log.info(
+ "Stopping taskGroup[%d] as it has already run for
duration[%s], configured task duration[%s].",
Review Comment:
No, units are not required since the object being printed is a duration and
would be formatted something like `PT1800S`, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]