This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1974a38bc90 Clean up allocation and supervisor logs for easier 
debugging (#16535)
1974a38bc90 is described below

commit 1974a38bc90783e7ff2d38362509377001177047
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jun 3 16:41:04 2024 +0530

    Clean up allocation and supervisor logs for easier debugging (#16535)
    
    Changes:
    - Use string taskGroup consistently to easily search for a task group
    - Clean up other logs
    - No change in any logic
---
 .../common/actions/SegmentAllocationQueue.java     |   4 +-
 .../druid/indexing/overlord/TaskLockbox.java       |   2 +-
 .../supervisor/SeekableStreamSupervisor.java       | 107 +++++++++------------
 3 files changed, 51 insertions(+), 62 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index 6986ec683a5..98ab50cff78 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -268,7 +268,7 @@ public class SegmentAllocationQueue
       catch (Throwable t) {
         currentBatch.failPendingRequests(t);
         processed = true;
-        log.error(t, "Error while processing batch [%s]", currentBatch.key);
+        log.error(t, "Error while processing batch[%s].", currentBatch.key);
       }
 
       // Requeue if not fully processed yet
@@ -619,7 +619,7 @@ public class SegmentAllocationQueue
     void failPendingRequests(Throwable cause)
     {
       if (!requestToFuture.isEmpty()) {
-        log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), 
cause.getMessage(), key);
+        log.warn("Failing [%d] requests in batch[%s], reason[%s].", size(), 
key, cause.getMessage());
         requestToFuture.values().forEach(future -> 
future.completeExceptionally(cause));
         requestToFuture.keySet().forEach(
             request -> emitTaskMetric("task/action/failed/count", 1L, request)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 7776663330b..2155ac2c265 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -657,7 +657,7 @@ public class TaskLockbox
         }
 
       } else {
-        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), 
posseToUse.getTaskLock().getGroupId());
+        log.debug("Task[%s] already present in TaskLock[%s].", task.getId(), 
posseToUse.getTaskLock().getGroupId());
       }
       return posseToUse;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d413a9bec3e..ec4de45cac7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -96,13 +97,12 @@ import 
org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
+import org.joda.time.Duration;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1203,19 +1203,17 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   }
 
                   try {
-                    Instant handleNoticeStartTime = Instant.now();
+                    final Stopwatch noticeHandleTime = 
Stopwatch.createStarted();
                     notice.handle();
-                    Instant handleNoticeEndTime = Instant.now();
-                    Duration timeElapsed = 
Duration.between(handleNoticeStartTime, handleNoticeEndTime);
                     String noticeType = notice.getType();
+                    emitNoticeProcessTime(noticeType, 
noticeHandleTime.millisElapsed());
                     if (log.isDebugEnabled()) {
                       log.debug(
-                          "Handled notice [%s] from notices queue in [%d] ms, "
-                              + "current notices queue size [%d] for 
datasource [%s]",
-                          noticeType, timeElapsed.toMillis(), 
getNoticesQueueSize(), dataSource
+                          "Handled notice[%s] from notices queue in [%d] ms, "
+                              + "current notices queue size [%d] for 
datasource[%s].",
+                          noticeType, noticeHandleTime.millisElapsed(), 
getNoticesQueueSize(), dataSource
                       );
                     }
-                    emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
                   }
                   catch (Throwable e) {
                     stateManager.recordThrowableEvent(e);
@@ -2837,10 +2835,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           earlyStopTime = 
DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration());
           log.info(
               "Previous partition set [%s] has changed to [%s] - requesting 
that tasks stop after [%s] at [%s]",
-              previousPartitionIds,
-              partitionIds,
-              tuningConfig.getRepartitionTransitionDuration(),
-              earlyStopTime
+              previousPartitionIds, partitionIds, 
tuningConfig.getRepartitionTransitionDuration(), earlyStopTime
           );
           break;
         }
@@ -3161,57 +3156,52 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> 
futures = new ArrayList<>();
     final List<Integer> futureGroupIds = new ArrayList<>();
 
-    boolean stopTasksEarly;
-    if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || 
earlyStopTime.isEqualNow())) {
-      log.info("Early stop requested - signalling tasks to complete");
-
+    final boolean stopTasksEarly;
+    if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
+      log.info("Early stop requested, signalling tasks to complete.");
       earlyStopTime = null;
       stopTasksEarly = true;
     } else {
       stopTasksEarly = false;
     }
 
-    AtomicInteger stoppedTasks = new AtomicInteger();
+    final AtomicInteger numStoppedTasks = new AtomicInteger();
     // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
-    activelyReadingTaskGroups
-        .entrySet().stream().sorted(
+    activelyReadingTaskGroups.entrySet().stream().sorted(
             Comparator.comparingLong(
-                (Entry<Integer, TaskGroup> entry) ->
-                    computeEarliestTaskStartTime(entry.getValue())
-                        .getMillis()))
+                taskGroupEntry -> 
computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
+            )
+    )
         .forEach(entry -> {
           Integer groupId = entry.getKey();
           TaskGroup group = entry.getValue();
 
-          if (stopTasksEarly) {
+          final DateTime earliestTaskStart = 
computeEarliestTaskStartTime(group);
+          final Duration runDuration = 
Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis());
+          if (stopTasksEarly || group.getHandoffEarly()) {
+            // If handoffEarly has been set, stop tasks irrespective of 
stopTaskCount
             log.info(
-                "Stopping task group [%d] early. It has run for [%s]",
-                groupId,
-                ioConfig.getTaskDuration()
+                "Stopping taskGroup[%d] early after running for duration[%s].",
+                groupId, runDuration
             );
             futureGroupIds.add(groupId);
             futures.add(checkpointTaskGroup(group, true));
-          } else {
-            DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
-
-            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || 
group.getHandoffEarly()) {
-              // if this task has run longer than the configured duration
-              // as long as the pending task groups are less than the 
configured stop task count.
-              // If shutdownEarly has been set, ignore stopTaskCount since 
this is a manual operator action.
-              if (pendingCompletionTaskGroups.values()
-                                             .stream()
-                                             
.mapToInt(CopyOnWriteArrayList::size)
-                                             .sum() + stoppedTasks.get()
-                  < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
-                log.info(
-                    "Task group [%d] has run for [%s]. Stopping.",
-                    groupId,
-                    ioConfig.getTaskDuration()
-                );
-                futureGroupIds.add(groupId);
-                futures.add(checkpointTaskGroup(group, true));
-                stoppedTasks.getAndIncrement();
-              }
+            if (group.getHandoffEarly()) {
+              numStoppedTasks.getAndIncrement();
+            }
+          } else if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+            // Stop this task group if it has run longer than the configured 
duration
+            // and the pending task groups are less than the configured stop 
task count.
+            int numPendingCompletionTaskGroups = 
pendingCompletionTaskGroups.values().stream()
+                                                                            
.mapToInt(List::size).sum();
+            if (numPendingCompletionTaskGroups + numStoppedTasks.get() < 
ioConfig.getMaxAllowedStops()) {
+              log.info(
+                  "Stopping taskGroup[%d] as it has already run for 
duration[%s], configured task duration[%s].",
+                  groupId, runDuration, ioConfig.getTaskDuration()
+              );
+              futureGroupIds.add(groupId);
+              futures.add(checkpointTaskGroup(group, true));
+              numStoppedTasks.getAndIncrement();
             }
           }
         });
@@ -3384,7 +3374,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
               if 
(endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
                 log.warn(
-                    "Checkpoint [%s] is same as the start sequences [%s] of 
latest sequence for the task group [%d]",
+                    "Checkpoint[%s] is same as the start sequences[%s] of 
latest sequence for the taskGroup[%d].",
                     endOffsets,
                     taskGroup.checkpointSequences.lastEntry().getValue(),
                     taskGroup.groupId
@@ -3579,7 +3569,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       //   2) Remove any tasks that have failed from the list
       //   3) If any task completed successfully, stop all the tasks in this 
group and move to the next group
 
-      log.debug("Task group [%d] pre-pruning: %s", groupId, 
taskGroup.taskIds());
+      log.debug("taskGroup[%d] pre-pruning: %s.", groupId, 
taskGroup.taskIds());
 
       Iterator<Entry<String, TaskData>> iTasks = 
taskGroup.tasks.entrySet().iterator();
       while (iTasks.hasNext()) {
@@ -3589,7 +3579,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
         // stop and remove bad tasks from the task group
         if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
-          log.info("Stopping task [%s] which does not match the expected 
sequence range and ingestion spec", taskId);
+          log.info("Stopping task[%s] as it does not match the expected 
sequence range and ingestion spec.", taskId);
           futures.add(stopTask(taskId, false));
           iTasks.remove();
           continue;
@@ -3613,7 +3603,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           break;
         }
       }
-      log.debug("Task group [%d] post-pruning: %s", groupId, 
taskGroup.taskIds());
+      log.debug("After pruning, taskGroup[%d] has tasks[%s].", groupId, 
taskGroup.taskIds());
     }
 
     // Ignore return value; just await.
@@ -3627,10 +3617,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = 
getLatestSequencesFromStream();
-    long nowTime = Instant.now().toEpochMilli();
-    boolean idle;
-    long idleTime;
-
+    final long nowTime = DateTimes.nowUtc().getMillis();
+    final boolean idle;
+    final long idleTime;
     if (lastActiveTimeMillis > 0
         && previousSequencesFromStream.equals(latestSequencesFromStream)
         && computeTotalLag() == 0) {
@@ -3684,7 +3673,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // check that there is a current task group for each group of partitions 
in [partitionGroups]
     for (Integer groupId : partitionGroups.keySet()) {
       if (!activelyReadingTaskGroups.containsKey(groupId)) {
-        log.info("Creating new task group [%d] for partitions %s", groupId, 
partitionGroups.get(groupId));
+        log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, 
partitionGroups.get(groupId));
         Optional<DateTime> minimumMessageTime;
         if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
           minimumMessageTime = 
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
@@ -3771,13 +3760,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       if (taskGroup.startingSequences == null ||
           taskGroup.startingSequences.size() == 0 ||
           taskGroup.startingSequences.values().stream().allMatch(x -> x == 
null || isEndOfShard(x))) {
-        log.debug("Nothing to read in any partition for taskGroup [%d], 
skipping task creation", groupId);
+        log.debug("Nothing to read in any partition for taskGroup[%d], 
skipping task creation.", groupId);
         continue;
       }
 
       if (ioConfig.getReplicas() > taskGroup.tasks.size()) {
         log.info(
-            "Number of tasks [%d] does not match configured numReplicas [%d] 
in task group [%d], creating more tasks",
+            "Number of tasks[%d] does not match configured numReplicas[%d] in 
taskGroup[%d], creating more tasks.",
             taskGroup.tasks.size(), ioConfig.getReplicas(), groupId
         );
         createTasksForGroup(groupId, ioConfig.getReplicas() - 
taskGroup.tasks.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to