gargvishesh commented on code in PR #16535:
URL: https://github.com/apache/druid/pull/16535#discussion_r1623791899


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -657,7 +657,7 @@ private TaskLockPosse createOrFindLockPosse(LockRequest 
request, Task task, bool
         }
 
       } else {
-        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock().getGroupId());
+        log.debug("Task[%s] already present in TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock().getGroupId());

Review Comment:
   ```suggestion
           log.debug("Task[%s] already present in TaskLock[%s].", task.getId(), 
posseToUse.getTaskLock().getGroupId());
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3384,7 +3381,7 @@ public Map<PartitionIdType, SequenceOffsetType> 
apply(List<Either<Throwable, Map
 
               if 
(endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
                 log.warn(
-                    "Checkpoint [%s] is same as the start sequences [%s] of 
latest sequence for the task group [%d]",
+                    "Checkpoint[%s] is same as the start sequences[%s] of 
latest sequence for the taskGroup[%d]",

Review Comment:
   ```suggestion
                       "Checkpoint[%s] is same as the start sequences[%s] of 
latest sequence for the taskGroup[%d].",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -268,7 +268,7 @@ private void processBatchesDue()
       catch (Throwable t) {
         currentBatch.failPendingRequests(t);
         processed = true;
-        log.error(t, "Error while processing batch [%s]", currentBatch.key);
+        log.error(t, "Error while processing batch[%s]", currentBatch.key);

Review Comment:
   ```suggestion
           log.error(t, "Error while processing batch[%s].", currentBatch.key);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3579,7 +3569,7 @@ private void checkCurrentTaskState() throws 
ExecutionException, InterruptedExcep
       //   2) Remove any tasks that have failed from the list
       //   3) If any task completed successfully, stop all the tasks in this 
group and move to the next group
 
-      log.debug("Task group [%d] pre-pruning: %s", groupId, 
taskGroup.taskIds());
+      log.debug("taskGroup[%d] pre-pruning: %s", groupId, taskGroup.taskIds());

Review Comment:
   ```suggestion
         log.debug("taskGroup[%d] pre-pruning: %s.", groupId, 
taskGroup.taskIds());
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3161,57 +3156,52 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
     final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> 
futures = new ArrayList<>();
     final List<Integer> futureGroupIds = new ArrayList<>();
 
-    boolean stopTasksEarly;
-    if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || 
earlyStopTime.isEqualNow())) {
-      log.info("Early stop requested - signalling tasks to complete");
-
+    final boolean stopTasksEarly;
+    if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
+      log.info("Early stop requested, signalling tasks to complete.");
       earlyStopTime = null;
       stopTasksEarly = true;
     } else {
       stopTasksEarly = false;
     }
 
-    AtomicInteger stoppedTasks = new AtomicInteger();
+    final AtomicInteger numStoppedTasks = new AtomicInteger();
     // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
-    activelyReadingTaskGroups
-        .entrySet().stream().sorted(
+    activelyReadingTaskGroups.entrySet().stream().sorted(
             Comparator.comparingLong(
-                (Entry<Integer, TaskGroup> entry) ->
-                    computeEarliestTaskStartTime(entry.getValue())
-                        .getMillis()))
+                taskGroupEntry -> 
computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
+            )
+    )
         .forEach(entry -> {
           Integer groupId = entry.getKey();
           TaskGroup group = entry.getValue();
 
-          if (stopTasksEarly) {
+          final DateTime earliestTaskStart = 
computeEarliestTaskStartTime(group);
+          final Duration runDuration = 
Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis());
+          if (stopTasksEarly || group.getHandoffEarly()) {
+            // If handoffEarly has been set, stop tasks irrespective of 
stopTaskCount
             log.info(
-                "Stopping task group [%d] early. It has run for [%s]",
-                groupId,
-                ioConfig.getTaskDuration()
+                "Stopping taskGroup[%d] early after running for duration[%s].",
+                groupId, runDuration
             );
             futureGroupIds.add(groupId);
             futures.add(checkpointTaskGroup(group, true));
-          } else {
-            DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
-
-            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || 
group.getHandoffEarly()) {
-              // if this task has run longer than the configured duration
-              // as long as the pending task groups are less than the 
configured stop task count.
-              // If shutdownEarly has been set, ignore stopTaskCount since 
this is a manual operator action.
-              if (pendingCompletionTaskGroups.values()
-                                             .stream()
-                                             
.mapToInt(CopyOnWriteArrayList::size)
-                                             .sum() + stoppedTasks.get()
-                  < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
-                log.info(
-                    "Task group [%d] has run for [%s]. Stopping.",
-                    groupId,
-                    ioConfig.getTaskDuration()
-                );
-                futureGroupIds.add(groupId);
-                futures.add(checkpointTaskGroup(group, true));
-                stoppedTasks.getAndIncrement();
-              }
+            if (group.getHandoffEarly()) {
+              numStoppedTasks.getAndIncrement();
+            }
+          } else if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+            // Stop this task group if it has run longer than the configured 
duration
+            // and the pending task groups are less than the configured stop 
task count.
+            int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                            
.mapToInt(List::size).sum();
+            if (numPendingCompletionTaskGroups + numStoppedTasks.get() < 
ioConfig.getMaxAllowedStops()) {
+              log.info(
+                  "Stopping taskGroup[%d] as it has already run for 
duration[%s], configured task duration[%s].",
+                  groupId, runDuration, ioConfig.getTaskDuration()
+              );
+              futureGroupIds.add(groupId);
+              futures.add(checkpointTaskGroup(group, true));
+              numStoppedTasks.getAndIncrement();
             }
           }

Review Comment:
   Units required here?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3771,13 +3767,13 @@ private void createNewTasks() throws 
JsonProcessingException
       if (taskGroup.startingSequences == null ||
           taskGroup.startingSequences.size() == 0 ||
           taskGroup.startingSequences.values().stream().allMatch(x -> x == 
null || isEndOfShard(x))) {
-        log.debug("Nothing to read in any partition for taskGroup [%d], 
skipping task creation", groupId);
+        log.debug("Nothing to read in any partition for taskGroup[%d], 
skipping task creation", groupId);

Review Comment:
   ```suggestion
           log.debug("Nothing to read in any partition for taskGroup[%d], 
skipping task creation.", groupId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to