This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1974a38bc90 Clean up allocation and supervisor logs for easier
debugging (#16535)
1974a38bc90 is described below
commit 1974a38bc90783e7ff2d38362509377001177047
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jun 3 16:41:04 2024 +0530
Clean up allocation and supervisor logs for easier debugging (#16535)
Changes:
- Use string taskGroup consistently to easily search for a task group
- Clean up other logs
- No change in any logic
---
.../common/actions/SegmentAllocationQueue.java | 4 +-
.../druid/indexing/overlord/TaskLockbox.java | 2 +-
.../supervisor/SeekableStreamSupervisor.java | 107 +++++++++------------
3 files changed, 51 insertions(+), 62 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index 6986ec683a5..98ab50cff78 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -268,7 +268,7 @@ public class SegmentAllocationQueue
catch (Throwable t) {
currentBatch.failPendingRequests(t);
processed = true;
- log.error(t, "Error while processing batch [%s]", currentBatch.key);
+ log.error(t, "Error while processing batch[%s].", currentBatch.key);
}
// Requeue if not fully processed yet
@@ -619,7 +619,7 @@ public class SegmentAllocationQueue
void failPendingRequests(Throwable cause)
{
if (!requestToFuture.isEmpty()) {
- log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(),
cause.getMessage(), key);
+ log.warn("Failing [%d] requests in batch[%s], reason[%s].", size(),
key, cause.getMessage());
requestToFuture.values().forEach(future ->
future.completeExceptionally(cause));
requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 7776663330b..2155ac2c265 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -657,7 +657,7 @@ public class TaskLockbox
}
} else {
- log.info("Task[%s] already present in TaskLock[%s]", task.getId(),
posseToUse.getTaskLock().getGroupId());
+ log.debug("Task[%s] already present in TaskLock[%s].", task.getId(),
posseToUse.getTaskLock().getGroupId());
}
return posseToUse;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d413a9bec3e..ec4de45cac7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -96,13 +97,12 @@ import
org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1203,19 +1203,17 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
try {
- Instant handleNoticeStartTime = Instant.now();
+ final Stopwatch noticeHandleTime =
Stopwatch.createStarted();
notice.handle();
- Instant handleNoticeEndTime = Instant.now();
- Duration timeElapsed =
Duration.between(handleNoticeStartTime, handleNoticeEndTime);
String noticeType = notice.getType();
+ emitNoticeProcessTime(noticeType,
noticeHandleTime.millisElapsed());
if (log.isDebugEnabled()) {
log.debug(
- "Handled notice [%s] from notices queue in [%d] ms, "
- + "current notices queue size [%d] for
datasource [%s]",
- noticeType, timeElapsed.toMillis(),
getNoticesQueueSize(), dataSource
+ "Handled notice[%s] from notices queue in [%d] ms, "
+ + "current notices queue size [%d] for
datasource[%s].",
+ noticeType, noticeHandleTime.millisElapsed(),
getNoticesQueueSize(), dataSource
);
}
- emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
@@ -2837,10 +2835,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
earlyStopTime =
DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration());
log.info(
"Previous partition set [%s] has changed to [%s] - requesting
that tasks stop after [%s] at [%s]",
- previousPartitionIds,
- partitionIds,
- tuningConfig.getRepartitionTransitionDuration(),
- earlyStopTime
+ previousPartitionIds, partitionIds,
tuningConfig.getRepartitionTransitionDuration(), earlyStopTime
);
break;
}
@@ -3161,57 +3156,52 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>>
futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();
- boolean stopTasksEarly;
- if (earlyStopTime != null && (earlyStopTime.isBeforeNow() ||
earlyStopTime.isEqualNow())) {
- log.info("Early stop requested - signalling tasks to complete");
-
+ final boolean stopTasksEarly;
+ if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
+ log.info("Early stop requested, signalling tasks to complete.");
earlyStopTime = null;
stopTasksEarly = true;
} else {
stopTasksEarly = false;
}
- AtomicInteger stoppedTasks = new AtomicInteger();
+ final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
- activelyReadingTaskGroups
- .entrySet().stream().sorted(
+ activelyReadingTaskGroups.entrySet().stream().sorted(
Comparator.comparingLong(
- (Entry<Integer, TaskGroup> entry) ->
- computeEarliestTaskStartTime(entry.getValue())
- .getMillis()))
+ taskGroupEntry ->
computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
+ )
+ )
.forEach(entry -> {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
- if (stopTasksEarly) {
+ final DateTime earliestTaskStart =
computeEarliestTaskStartTime(group);
+ final Duration runDuration =
Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis());
+ if (stopTasksEarly || group.getHandoffEarly()) {
+ // If handoffEarly has been set, stop tasks irrespective of
stopTaskCount
log.info(
- "Stopping task group [%d] early. It has run for [%s]",
- groupId,
- ioConfig.getTaskDuration()
+ "Stopping taskGroup[%d] early after running for duration[%s].",
+ groupId, runDuration
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
- } else {
- DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
-
- if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() ||
group.getHandoffEarly()) {
- // if this task has run longer than the configured duration
- // as long as the pending task groups are less than the
configured stop task count.
- // If shutdownEarly has been set, ignore stopTaskCount since
this is a manual operator action.
- if (pendingCompletionTaskGroups.values()
- .stream()
-
.mapToInt(CopyOnWriteArrayList::size)
- .sum() + stoppedTasks.get()
- < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
- log.info(
- "Task group [%d] has run for [%s]. Stopping.",
- groupId,
- ioConfig.getTaskDuration()
- );
- futureGroupIds.add(groupId);
- futures.add(checkpointTaskGroup(group, true));
- stoppedTasks.getAndIncrement();
- }
+ if (group.getHandoffEarly()) {
+ numStoppedTasks.getAndIncrement();
+ }
+ } else if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+ // Stop this task group if it has run longer than the configured
duration
+ // and the pending task groups are less than the configured stop
task count.
+ int numPendingCompletionTaskGroups =
pendingCompletionTaskGroups.values().stream()
+
.mapToInt(List::size).sum();
+ if (numPendingCompletionTaskGroups + numStoppedTasks.get() <
ioConfig.getMaxAllowedStops()) {
+ log.info(
+ "Stopping taskGroup[%d] as it has already run for
duration[%s], configured task duration[%s].",
+ groupId, runDuration, ioConfig.getTaskDuration()
+ );
+ futureGroupIds.add(groupId);
+ futures.add(checkpointTaskGroup(group, true));
+ numStoppedTasks.getAndIncrement();
}
}
});
@@ -3384,7 +3374,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if
(endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
log.warn(
- "Checkpoint [%s] is same as the start sequences [%s] of
latest sequence for the task group [%d]",
+ "Checkpoint[%s] is same as the start sequences[%s] of
latest sequence for the taskGroup[%d].",
endOffsets,
taskGroup.checkpointSequences.lastEntry().getValue(),
taskGroup.groupId
@@ -3579,7 +3569,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// 2) Remove any tasks that have failed from the list
// 3) If any task completed successfully, stop all the tasks in this
group and move to the next group
- log.debug("Task group [%d] pre-pruning: %s", groupId,
taskGroup.taskIds());
+ log.debug("taskGroup[%d] pre-pruning: %s.", groupId,
taskGroup.taskIds());
Iterator<Entry<String, TaskData>> iTasks =
taskGroup.tasks.entrySet().iterator();
while (iTasks.hasNext()) {
@@ -3589,7 +3579,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// stop and remove bad tasks from the task group
if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
- log.info("Stopping task [%s] which does not match the expected
sequence range and ingestion spec", taskId);
+ log.info("Stopping task[%s] as it does not match the expected
sequence range and ingestion spec.", taskId);
futures.add(stopTask(taskId, false));
iTasks.remove();
continue;
@@ -3613,7 +3603,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
break;
}
}
- log.debug("Task group [%d] post-pruning: %s", groupId,
taskGroup.taskIds());
+ log.debug("After pruning, taskGroup[%d] has tasks[%s].", groupId,
taskGroup.taskIds());
}
// Ignore return value; just await.
@@ -3627,10 +3617,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream =
getLatestSequencesFromStream();
- long nowTime = Instant.now().toEpochMilli();
- boolean idle;
- long idleTime;
-
+ final long nowTime = DateTimes.nowUtc().getMillis();
+ final boolean idle;
+ final long idleTime;
if (lastActiveTimeMillis > 0
&& previousSequencesFromStream.equals(latestSequencesFromStream)
&& computeTotalLag() == 0) {
@@ -3684,7 +3673,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// check that there is a current task group for each group of partitions
in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) {
- log.info("Creating new task group [%d] for partitions %s", groupId,
partitionGroups.get(groupId));
+ log.info("Creating new taskGroup[%d] for partitions[%s].", groupId,
partitionGroups.get(groupId));
Optional<DateTime> minimumMessageTime;
if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
minimumMessageTime =
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
@@ -3771,13 +3760,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (taskGroup.startingSequences == null ||
taskGroup.startingSequences.size() == 0 ||
taskGroup.startingSequences.values().stream().allMatch(x -> x ==
null || isEndOfShard(x))) {
- log.debug("Nothing to read in any partition for taskGroup [%d],
skipping task creation", groupId);
+ log.debug("Nothing to read in any partition for taskGroup[%d],
skipping task creation.", groupId);
continue;
}
if (ioConfig.getReplicas() > taskGroup.tasks.size()) {
log.info(
- "Number of tasks [%d] does not match configured numReplicas [%d]
in task group [%d], creating more tasks",
+ "Number of tasks[%d] does not match configured numReplicas[%d] in
taskGroup[%d], creating more tasks.",
taskGroup.tasks.size(), ioConfig.getReplicas(), groupId
);
createTasksForGroup(groupId, ioConfig.getReplicas() -
taskGroup.tasks.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]